Skip Menu |
 

This queue is for tickets about the Net-Stomp CPAN distribution.

Report information
The Basics
Id: 68160
Status: resolved
Priority: 0/
Queue: Net-Stomp

People
Owner: Nobody in particular
Requestors: mstemm [...] cloudmark.com
Cc:
AdminCc:

Bug Information
Severity: Normal
Broken in: 0.41
Fixed in: (no value)

Attachments
01-cloudmark-disconnect-improvements-README.txt
01-cloudmark-disconnect-improvements-v2.patch
01-cloudmark-disconnect-improvements.patch



Subject: Improvements in the face of broker disconnects
Download (untitled) / with headers
text/plain 455b
Hi, thanks for writing the useful Net::Stomp library. We at Cloudmark have made some improvements to the library to gracefully handle sending/receiving messages in the face of brokers going down and coming back up. I've attached a summary of the changes and a patch file based on the 0.41 tarball that incorporates the changes. If you have further improvements or refinements you'd like to see, we'd be happy to make them. Otherwise, enjoy the changes.
Subject: 01-cloudmark-disconnect-improvements-README.txt
We made the following improvements to the Net::Stomp library to be more robust and return more concrete errors in the face of broker disconnects: - Many functions didn't have a clearly documented return value. Updated all functions to have a return value. Generally 1 implies success and 0 implies failure. See the perl docs for more details on the return values from each function. - Add a new option to the constructor 'ignore_sigpipe'. If provided and set to 1, it will ignore SIGPIPE so writes to closed sockets don't result in killing the process. - Add a new option to the constructor 'reconnect_attempts'. If provided and set to non-zero, when sending/receiving messages, the library will only attempt to reconnect this many times before giving up and returning an error. By default, this is set to 0, which retains the old behavior of attempting to reconnect forever on any failure. - In connect(), verify that a response frame was actually read and actually had a CONNECTED response. - Whenever calling functions like send/send_frame internally, check the return value and stop if the action does not succeed. - Add new functions send_with_receipt/send_frame_with_receipt that send messages with a receipt header, which forces the server to reply with a RECEIPT message. - Modify higher-level functions like subscribe/send_transactional to always use send_with_receipt/send_frame_with_receipt, to avoid problems where an ERROR response from a message that would otherwise not return anything ends up being accidentally read as the response to a later message. - Monitor the value of syswrite when writing messages to the socket to ensure that all of the message was written.
Subject: 01-cloudmark-disconnect-improvements.patch
Index: lib/Net/Stomp.pm =================================================================== 13c13 < ssl_options subscriptions _connect_headers bufsize --- > ssl_options subscriptions _connect_headers bufsize reconnect_attempts ignore_sigpipe 29a30,40 > # Make the number of reconnection attempts per send/receive > # attempt configurable. The default is 0 (retry forever). > if (! defined $self->reconnect_attempts) { > $self->reconnect_attempts(0); > } > > # If configured to do so, ignore SIGPIPE. > if (defined $self->ignore_sigpipe && $self->ignore_sigpipe == 1) { > $SIG{PIPE} = 'IGNORE'; > } > 111c122 < my ( $self, $conf ) = @_; --- > my ( $self, $conf, $response_frame_ref ) = @_; 115c126,129 < $self->send_frame($frame); --- > if ($self->send_frame($frame) == 0) { > return undef; > } > 117a132,148 > # If response_frame_ref is defined, set it to the response. > if (defined $response_frame_ref) { > $$response_frame_ref = $frame; > } > > # If receive_frame didn't actually return anything, we can return > # failure immediately. > if (! defined $frame) { > return 0; > } > > # If receive_frame didn't return a CONNECTED response, return > # failure. > if ($frame->command ne "CONNECTED") { > return 0; > } > 123c154 < return $frame; --- > return 1; 128a160,161 > > # Note: ignoring result from send_frame. 135a169 > 140,142c174,175 < while ($@) { < sleep(5); < eval { $self->_get_connection }; --- > if ($@) { > return; 144c177,182 < $self->connect( $self->_connect_headers ); --- > > if ($self->connect( $self->_connect_headers ) == 0) { > $self->socket->close; > return; > } > 146c184,187 < $self->subscribe($self->subscriptions->{$sub}); --- > if ($self->subscribe($self->subscriptions->{$sub}) == 0) { > $self->socket->close; > return; > } 173c214,225 < $self->send_frame($frame); --- > return $self->send_frame($frame); > } > > sub send_with_receipt { > my ($self, $conf, $send_response_ref) = @_; > > my $body = $conf->{body}; > delete $conf->{body}; > my $frame = Net::Stomp::Frame->new( > { command => 'SEND', headers => $conf, body => $body } ); > > return $self->send_frame_with_receipt($frame, $send_response_ref); 180a233,239 > # Note: as this function depends on a sequence of messages, any of > # which *may* return an error from the broker on failure, we send > # all messages using send_with_receipt/send_frame_with_receipt. If > # we didn't include receipt ids, one of these messages could > # return 0 or 1 responses back, which throws off the pipeline for > # subsequent messages or subsequent calls to send_transactional. > 182a242 > 187c247,249 < $self->send_frame($begin_frame); --- > if ($self->send_frame_with_receipt($begin_frame) == 0) { > return 0; > } 190,200d251 < my $receipt_id = $self->_get_next_transaction; < $conf->{receipt} = $receipt_id; < my $message_frame = Net::Stomp::Frame->new( < { command => 'SEND', headers => $conf, body => $body } ); < $self->send_frame($message_frame); < < # check the receipt < my $receipt_frame = $self->receive_frame; < if ( $receipt_frame->command eq 'RECEIPT' < && $receipt_frame->headers->{'receipt-id'} eq $receipt_id ) < { 202,209c253,254 < # success, commit the transaction < my $frame_commit = Net::Stomp::Frame->new( < { command => 'COMMIT', < headers => { transaction => $transaction_id } < } < ); < return $self->send_frame($frame_commit); < } else { --- > if ($self->send_with_receipt($conf, undef) == 0) { > # The send wasn't acknowledged. ABORT the transaction. 211d255 < # some failure, abort transaction 217c261,264 < $self->send_frame($frame_abort); --- > > # Note: ignoring result from send_frame_with_receipt, as we're > # going to return an error anyway. > $self->send_frame_with_receipt($frame_abort); 219a267,274 > > # success, commit the transaction > my $frame_commit = Net::Stomp::Frame->new( > { command => 'COMMIT', > headers => { transaction => $transaction_id } > } > ); > return $self->send_frame_with_receipt($frame_commit); 226c281,283 < $self->send_frame($frame); --- > if ($self->send_frame_with_receipt($frame) == 0) { > return 0; > } 228a286,287 > > return 1; 235c294,296 < $self->send_frame($frame); --- > if ($self->send_frame_with_receipt($frame) == 0) { > return 0; > } 237a299,300 > > return 1; 245c308 < $self->send_frame($frame); --- > return $self->send_frame_with_receipt($frame); 251,254c314,331 < # warn "send [" . $frame->as_string . "]\n"; < $self->socket->syswrite( $frame->as_string ); < my $connected = $self->socket->connected; < unless (defined $connected) { --- > # Try up to reconnect_attempts times to send the frame (or > # forever, if reconnect_attempts is 0). > > for(my $attempts = $self->reconnect_attempts; > $self->reconnect_attempts == 0 || $attempts > 0; > $attempts--) { > > # warn "send [" . $frame->as_string . "]\n"; > my $written = $self->socket->syswrite( $frame->as_string ); > > if (defined $written && $written == length($frame->as_string)) { > return 1; > } > > # If here, we couldn't send the frame, which was either due to a > # short write or due to an inability to write at all. In either > # case, try to reconnect. > sleep(5); 256d332 < $self->send_frame($frame); 257a334,371 > > # If here, we couldn't send. Give up. > return 0; > } > > sub send_frame_with_receipt { > my ($self, $frame, $send_response_ref) = @_; > > if (defined $send_response_ref) { > $$send_response_ref = undef; > } > > my $receipt_id = $self->_get_next_transaction; > $frame->headers->{receipt} = $receipt_id; > > if ($self->send_frame($frame) == 0) { > return 0; > } > > # Read the response, which will either be a RECEIPT or some > # ERROR. Always wait up to 30 seconds for a response. > > my $response = $self->receive_frame({timeout => 30}); > > if (! defined $response) { > return 0; > } > > if (defined $send_response_ref) { > $$send_response_ref = $response; > } > > if ($response->command ne 'RECEIPT' || > $response->headers->{'receipt-id'} ne $receipt_id) { > return 0; > } > > return 1; 339c453,460 < unless (defined $connected) { --- > > # Try up to reconnect_attempts times to connect (or > # forever, if reconnect_attempts is 0). > for(my $attempts = $self->reconnect_attempts; > (! defined $connected && ($self->reconnect_attempts == 0 || $attempts > 0)); > $attempts--) { > > sleep(5); 340a462 > $connected = $self->socket->connected; 342a465,467 > # If we still can't connect, give up. > return undef if (! defined $connected); > 467a593,615 > By default, if the broker goes away, socket writes could result in a > SIGPIPE signal, terminating the process. If you pass the option > ignore_sigpipe to new with value 1, the library will set up a signal > handler for SIGPIPE that ignores the signal: > > my $stomp = Net::Stomp->new( { > hostname => 'localhost', > port => '61612', > ignore_sigpipe => 1 > } ); > > By default, when sending/receiving a message, if the library is not > currently connected to the broker, it will attempt to reconnect > (forever) before sending/receiving the message. If you want to limit > the number of reconnection attempts that occur before giving up and > returning an error, pass the option reconnect_attempts in new(): > > my $stomp = Net::Stomp->new( { > hostname => 'localhost', > port => '61612', > reconnect_attempts => 5 > } ); > 481,482c629,631 < This connects to the Stomp server. You may pass in a C<login> and < C<passcode> options. --- > This connects to the Stomp server. The first argument is a hash of > headers to include along with the CONNECT message. In that hash, you > may pass in a C<login> and C<passcode> options. 488c637,644 < $stomp->connect( { login => 'hello', passcode => 'there' } ); --- > The second (optional) argument is a scalar reference which passes back > the response from the CONNECT message. It will either be set to undef > if the connect failed, or a Net::Stomp::Frame object containing the > response. > > $stomp->connect( { login => 'hello', passcode => 'there' }, \$connect_response ); > > connect() returns 1 on success, 0 on failure. 498c654,678 < To send a BytesMessage, you should set the field 'bytes_message' to 1. --- > To send a BytesMessage, you should set the field 'bytes_message' to > 1. > > This function returns 1 on success, 0 on failure. > > =head2 send_with_receipt > > This sends a message to a queue or topic, and requires a positive > acknowledgement via a RECEIPT response. You must pass in a destination > and a body. > > The second (optional) argument is a scalar reference which passes back > the response from the SEND message. It will either be set to undef if > the message could not be sent, or a Net::Stomp::Frame object > containing the response. This response will either be a RECEIPT > message or some ERROR message. > > $stomp->send_with_receipt( > { destination => '/queue/foo', body => 'test message' }, \$send_response ); > > To send a BytesMessage, you should set the field 'bytes_message' to > 1. > > This function returns 1 on success (i.e. the message was successfully > sent and a RECEIPT message received), or 0 on failure. 514a695,697 > This function returns 1 on success (i.e. the message was sent and > committed), 0 on failure. > 575a759,760 > This function returns 1 on success, 0 on failure. > 581a767,768 > This function returns 1 on success, 0 on failure. > 596a784,786 > This function returns a Net::Stomp::Frame object on success, undef on > failure (or timeout). > 624a815,835 > This function returns 1 if the frame could be sent, 0 otherwise. > > =head2 send_frame_with_receipt > > This sends a single frame and requires a positive acknowledgement via > a RECEIPT response. > > The second (optional) argument is a scalar reference which passes back > the response from sending the frame. It will either be set to undef if > the frame could not be sent, or a Net::Stomp::Frame object > containing the response. This response will either be a RECEIPT > message or some ERROR message. > > my $frame = Net::Stomp::Frame->new( > { command => $command, headers => $conf, body => $body } ); > > $stomp->send_frame_with_receipt($frame, \$send_response); > > This function returns 1 on success (i.e. the message was successfully > sent and a RECEIPT message received), or 0 on failure. > 635a847 > Mark Stemm <mstemm@cloudmark.com>
From: mstemm [...] cloudmark.com
Download (untitled) / with headers
text/plain 666b
Whoops, the prior patch had a bug that prevented message bodies from being included when using send_transactional. Here is an updated version On Thu May 12 20:32:31 2011, markstemm wrote: Show quoted text
> Hi, thanks for writing the useful Net::Stomp library. We at Cloudmark > have made some improvements to the library to gracefully handle > sending/receiving messages in the face of brokers going down and coming > back up. > > I've attached a summary of the changes and a patch file based on the > 0.41 tarball that incorporates the changes. > > If you have further improvements or refinements you'd like to see, we'd > be happy to make them. Otherwise, enjoy the changes.
Subject: 01-cloudmark-disconnect-improvements-v2.patch
Index: lib/Net/Stomp.pm =================================================================== 13c13 < ssl_options subscriptions _connect_headers bufsize --- > ssl_options subscriptions _connect_headers bufsize reconnect_attempts ignore_sigpipe 29a30,40 > # Make the number of reconnection attempts per send/receive > # attempt configurable. The default is 0 (retry forever). > if (! defined $self->reconnect_attempts) { > $self->reconnect_attempts(0); > } > > # If configured to do so, ignore SIGPIPE. > if (defined $self->ignore_sigpipe && $self->ignore_sigpipe == 1) { > $SIG{PIPE} = 'IGNORE'; > } > 111c122 < my ( $self, $conf ) = @_; --- > my ( $self, $conf, $response_frame_ref ) = @_; 115c126,129 < $self->send_frame($frame); --- > if ($self->send_frame($frame) == 0) { > return undef; > } > 117a132,148 > # If response_frame_ref is defined, set it to the response. > if (defined $response_frame_ref) { > $$response_frame_ref = $frame; > } > > # If receive_frame didn't actually return anything, we can return > # failure immediately. > if (! defined $frame) { > return 0; > } > > # If receive_frame didn't return a CONNECTED response, return > # failure. > if ($frame->command ne "CONNECTED") { > return 0; > } > 123c154 < return $frame; --- > return 1; 128a160,161 > > # Note: ignoring result from send_frame. 135a169 > 140,142c174,175 < while ($@) { < sleep(5); < eval { $self->_get_connection }; --- > if ($@) { > return; 144c177,182 < $self->connect( $self->_connect_headers ); --- > > if ($self->connect( $self->_connect_headers ) == 0) { > $self->socket->close; > return; > } > 146c184,187 < $self->subscribe($self->subscriptions->{$sub}); --- > if ($self->subscribe($self->subscriptions->{$sub}) == 0) { > $self->socket->close; > return; > } 173c214 < $self->send_frame($frame); --- > return $self->send_frame($frame); 176,177c217,219 < sub send_transactional { < my ( $self, $conf ) = @_; --- > sub send_with_receipt { > my ($self, $conf, $send_response_ref) = @_; > 179a222,236 > my $frame = Net::Stomp::Frame->new( > { command => 'SEND', headers => $conf, body => $body } ); > > return $self->send_frame_with_receipt($frame, $send_response_ref); > } > > sub send_transactional { > my ( $self, $conf ) = @_; > > # Note: as this function depends on a sequence of messages, any of > # which *may* return an error from the broker on failure, we send > # all messages using send_with_receipt/send_frame_with_receipt. If > # we didn't include receipt ids, one of these messages could > # return 0 or 1 responses back, which throws off the pipeline for > # subsequent messages or subsequent calls to send_transactional. 182a240 > 187c245,247 < $self->send_frame($begin_frame); --- > if ($self->send_frame_with_receipt($begin_frame) == 0) { > return 0; > } 190,200d249 < my $receipt_id = $self->_get_next_transaction; < $conf->{receipt} = $receipt_id; < my $message_frame = Net::Stomp::Frame->new( < { command => 'SEND', headers => $conf, body => $body } ); < $self->send_frame($message_frame); < < # check the receipt < my $receipt_frame = $self->receive_frame; < if ( $receipt_frame->command eq 'RECEIPT' < && $receipt_frame->headers->{'receipt-id'} eq $receipt_id ) < { 202,209c251,252 < # success, commit the transaction < my $frame_commit = Net::Stomp::Frame->new( < { command => 'COMMIT', < headers => { transaction => $transaction_id } < } < ); < return $self->send_frame($frame_commit); < } else { --- > if ($self->send_with_receipt($conf, undef) == 0) { > # The send wasn't acknowledged. ABORT the transaction. 211d253 < # some failure, abort transaction 217c259,262 < $self->send_frame($frame_abort); --- > > # Note: ignoring result from send_frame_with_receipt, as we're > # going to return an error anyway. > $self->send_frame_with_receipt($frame_abort); 219a265,272 > > # success, commit the transaction > my $frame_commit = Net::Stomp::Frame->new( > { command => 'COMMIT', > headers => { transaction => $transaction_id } > } > ); > return $self->send_frame_with_receipt($frame_commit); 226c279,281 < $self->send_frame($frame); --- > if ($self->send_frame_with_receipt($frame) == 0) { > return 0; > } 228a284,285 > > return 1; 235c292,294 < $self->send_frame($frame); --- > if ($self->send_frame_with_receipt($frame) == 0) { > return 0; > } 237a297,298 > > return 1; 245c306 < $self->send_frame($frame); --- > return $self->send_frame_with_receipt($frame); 251,254c312,329 < # warn "send [" . $frame->as_string . "]\n"; < $self->socket->syswrite( $frame->as_string ); < my $connected = $self->socket->connected; < unless (defined $connected) { --- > # Try up to reconnect_attempts times to send the frame (or > # forever, if reconnect_attempts is 0). > > for(my $attempts = $self->reconnect_attempts; > $self->reconnect_attempts == 0 || $attempts > 0; > $attempts--) { > > # warn "send [" . $frame->as_string . "]\n"; > my $written = $self->socket->syswrite( $frame->as_string ); > > if (defined $written && $written == length($frame->as_string)) { > return 1; > } > > # If here, we couldn't send the frame, which was either due to a > # short write or due to an inability to write at all. In either > # case, try to reconnect. > sleep(5); 256d330 < $self->send_frame($frame); 257a332,369 > > # If here, we couldn't send. Give up. > return 0; > } > > sub send_frame_with_receipt { > my ($self, $frame, $send_response_ref) = @_; > > if (defined $send_response_ref) { > $$send_response_ref = undef; > } > > my $receipt_id = $self->_get_next_transaction; > $frame->headers->{receipt} = $receipt_id; > > if ($self->send_frame($frame) == 0) { > return 0; > } > > # Read the response, which will either be a RECEIPT or some > # ERROR. Always wait up to 30 seconds for a response. > > my $response = $self->receive_frame({timeout => 30}); > > if (! defined $response) { > return 0; > } > > if (defined $send_response_ref) { > $$send_response_ref = $response; > } > > if ($response->command ne 'RECEIPT' || > $response->headers->{'receipt-id'} ne $receipt_id) { > return 0; > } > > return 1; 339c451,458 < unless (defined $connected) { --- > > # Try up to reconnect_attempts times to connect (or > # forever, if reconnect_attempts is 0). > for(my $attempts = $self->reconnect_attempts; > (! defined $connected && ($self->reconnect_attempts == 0 || $attempts > 0)); > $attempts--) { > > sleep(5); 340a460 > $connected = $self->socket->connected; 342a463,465 > # If we still can't connect, give up. > return undef if (! defined $connected); > 467a591,613 > By default, if the broker goes away, socket writes could result in a > SIGPIPE signal, terminating the process. If you pass the option > ignore_sigpipe to new with value 1, the library will set up a signal > handler for SIGPIPE that ignores the signal: > > my $stomp = Net::Stomp->new( { > hostname => 'localhost', > port => '61612', > ignore_sigpipe => 1 > } ); > > By default, when sending/receiving a message, if the library is not > currently connected to the broker, it will attempt to reconnect > (forever) before sending/receiving the message. If you want to limit > the number of reconnection attempts that occur before giving up and > returning an error, pass the option reconnect_attempts in new(): > > my $stomp = Net::Stomp->new( { > hostname => 'localhost', > port => '61612', > reconnect_attempts => 5 > } ); > 481,482c627,629 < This connects to the Stomp server. You may pass in a C<login> and < C<passcode> options. --- > This connects to the Stomp server. The first argument is a hash of > headers to include along with the CONNECT message. In that hash, you > may pass in a C<login> and C<passcode> options. 488c635,642 < $stomp->connect( { login => 'hello', passcode => 'there' } ); --- > The second (optional) argument is a scalar reference which passes back > the response from the CONNECT message. It will either be set to undef > if the connect failed, or a Net::Stomp::Frame object containing the > response. > > $stomp->connect( { login => 'hello', passcode => 'there' }, \$connect_response ); > > connect() returns 1 on success, 0 on failure. 498c652,676 < To send a BytesMessage, you should set the field 'bytes_message' to 1. --- > To send a BytesMessage, you should set the field 'bytes_message' to > 1. > > This function returns 1 on success, 0 on failure. > > =head2 send_with_receipt > > This sends a message to a queue or topic, and requires a positive > acknowledgement via a RECEIPT response. You must pass in a destination > and a body. > > The second (optional) argument is a scalar reference which passes back > the response from the SEND message. It will either be set to undef if > the message could not be sent, or a Net::Stomp::Frame object > containing the response. This response will either be a RECEIPT > message or some ERROR message. > > $stomp->send_with_receipt( > { destination => '/queue/foo', body => 'test message' }, \$send_response ); > > To send a BytesMessage, you should set the field 'bytes_message' to > 1. > > This function returns 1 on success (i.e. the message was successfully > sent and a RECEIPT message received), or 0 on failure. 514a693,695 > This function returns 1 on success (i.e. the message was sent and > committed), 0 on failure. > 575a757,758 > This function returns 1 on success, 0 on failure. > 581a765,766 > This function returns 1 on success, 0 on failure. > 596a782,784 > This function returns a Net::Stomp::Frame object on success, undef on > failure (or timeout). > 624a813,833 > This function returns 1 if the frame could be sent, 0 otherwise. > > =head2 send_frame_with_receipt > > This sends a single frame and requires a positive acknowledgement via > a RECEIPT response. > > The second (optional) argument is a scalar reference which passes back > the response from sending the frame. It will either be set to undef if > the frame could not be sent, or a Net::Stomp::Frame object > containing the response. This response will either be a RECEIPT > message or some ERROR message. > > my $frame = Net::Stomp::Frame->new( > { command => $command, headers => $conf, body => $body } ); > > $stomp->send_frame_with_receipt($frame, \$send_response); > > This function returns 1 on success (i.e. the message was successfully > sent and a RECEIPT message received), or 0 on failure. > 635a845 > Mark Stemm <mstemm@cloudmark.com>
Download (untitled) / with headers
text/plain 306b
I know this is an old ticket, but I like the features here. Would it ever be necessary to not ignore SIGPIPE? It seems like Net::Stomp tries very hard to reconnect whenever it can, so ignoring SIGPIPE seems like it should be the default. The number of reconnects logic looks like it can be added as-is.
Download (untitled) / with headers
text/plain 559b
The next release will ignore SIGPIPE locally while writing, so we don't break whatever signal handling the rest of the process has set up. I have also implemented a limit on the number of reconnect attempts (separate for the initial socket connection, and for subsequent reconnections, for back-compat). I will document the return values of each method, I agree they are confusing. With all the reconnect logic, send_frame will never need to return "I couldn't send", it will either succeed, or throw an exception when the reconnect attempts are exceeded.
Download (untitled) / with headers
text/plain 214b
I think I addressed all your suggestions in 0.47. If I forgot or misunderstood something, please open another ticket. (Also, please send unified/contextual patches, or GitHub pull requests: they're easier to read)


This service is sponsored and maintained by Best Practical Solutions and runs on Perl.org infrastructure.

Please report any issues with rt.cpan.org to rt-cpan-admin@bestpractical.com.