burnersk burnersk - 7 months ago 30
Perl Question

How to disconnect from RabbitMQ properly using Perl's AnyEvent::RabbitMQ?

I want to disconnect from RabbitMQ in a proper way. By reviewing the source code of Perl's AnyEvent::RabbitMQ (which I am using), I found the

close
method which seems to close all the channels opened to RabbitMQ.

So I


  1. connected to RabbitMQ

  2. opened a channel

  3. declared an exchange

  4. bound to that exchange

  5. declared a queue

  6. bound to that queue

  7. execute the
    close
    method on the
    AnyEvent::RabbitMQ
    instance (not the
    ::Channel
    instance)



The connection seems to be closed but the RabbitMQ logs shows that the "AMQP connection" was "connection_closed_abruptly".

Here is the complete RabbitMQ log for that connection:

=INFO REPORT==== 14-Jan-2016::10:02:15 ===
accepting AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672)

=WARNING REPORT==== 14-Jan-2016::10:02:16 ===
closing AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672):
connection_closed_abruptly


Here is the sample code:

#!/usr/bin/perl
use strictures 1;

use AnyEvent::RabbitMQ;
use Data::Printer;

my ( $rabbitmq, $rabbitmq_channel );

my $condvar = AnyEvent->condvar;

$rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => '127.0.0.1',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
tune => { heartbeat => 1 },
on_success => sub {
($rabbitmq) = @_;
$rabbitmq->open_channel(
on_success => sub {
($rabbitmq_channel) = @_;
$rabbitmq_channel->confirm;
$rabbitmq_channel->declare_exchange(
exchange => 'test_exchange',
type => 'fanout',
on_success => sub {
$rabbitmq_channel->bind_exchange(
source => 'test_exchange',
destination => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq_channel->declare_queue(
queue => 'test_queue',
on_success => sub {
$rabbitmq_channel->bind_queue(
queue => 'test_queue',
exchange => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq->close;
undef $rabbitmq;
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_read_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);

my $reason = [ $condvar->recv ];
p $reason;


How to disconnect from RabbitMQ properly using Perl's
AnyEvent::RabbitMQ
?

Answer

There are indicators of reference cycles. These could keep the structure from being properly destructed.

  1. "my $rabbitmq; $rabbitmq = ..." yells out the possibility of a reference cycle.

  2. "my $rabbitmq_channel; $rabbitmq_channel = ..." yells out the possibility of a reference cycle.

  3. $rabbitmq_channel is owned by (stored in) $rabbitmq, but it's also captured by $rabbitmq_channel's event handlers.

The changes marked <=== replace unacceptable code.

The change marked <--- might be necessary. If $rabbitmq_channel is undefined in the callbacks, remove this change.

use Scalar::Util qw( weaken );

my $done_cv = AnyEvent->condvar;

my $rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(  # <===
  host       => '127.0.0.1',
  port       => 5672,
  user       => 'guest',
  pass       => 'guest',
  vhost      => '/',
  timeout    => 1,
  tls        => 0,
  tune       => { heartbeat => 1 },
  on_success => sub {
    my ($rabbitmq) = @_;  # <===
    $rabbitmq->open_channel(
      on_success => sub {
        my ($rabbitmq_channel) = @_;  # <===
        {  # <---
          my $rabbitmq_channel = weaken($rabbitmq_channel);  # <---
          $rabbitmq_channel->confirm;
          $rabbitmq_channel->declare_exchange(
            exchange   => 'test_exchange',
            type       => 'fanout',
            on_success => sub {
              $rabbitmq_channel->bind_exchange(
                source      => 'test_exchange',
                destination => 'test_exchange',
                routing_key => '',
                on_success  => sub {
                  $rabbitmq_channel->declare_queue(
                    queue      => 'test_queue',
                    on_success => sub {
                      $rabbitmq_channel->bind_queue(
                        queue       => 'test_queue',
                        exchange    => 'test_exchange',
                        routing_key => '',
                        on_success  => sub { $done_cv->send( __LINE__, @_ ) },  # <===
                        on_failure => sub { $done_cv->send( __LINE__, @_ ) },
                      );
                    },
                    on_failure => sub { $done_cv->send( __LINE__, @_ ) },
                  );
                },
                on_failure => sub { $done_cv->send( __LINE__, @_ ) },
              );
            },
            on_failure => sub { $done_cv->send( __LINE__, @_ ) },
          );
        }  # <---
      },
      on_failure => sub { $done_cv->send( __LINE__, @_ ) },
      on_return  => sub { $done_cv->send( __LINE__, @_ ) },
      on_close   => sub { $done_cv->send( __LINE__, @_ ) },
    );
  },
  on_failure      => sub { $done_cv->send( __LINE__, @_ ) },
  on_read_failure => sub { $done_cv->send( __LINE__, @_ ) },
  on_return       => sub { $done_cv->send( __LINE__, @_ ) },
  on_close        => sub { $done_cv->send( __LINE__, @_ ) },
);

my $reason = [ $done_cv->recv ];
p $reason;

I hope this helps.

Comments