如何使用 Perl 的 AnyEvent::RabbitMQ 正确断开与 RabbitMQ 的连接?
How to disconnect from RabbitMQ properly using Perl's AnyEvent::RabbitMQ?
我想以正确的方式断开与 RabbitMQ 的连接。通过 reviewing the source code of Perl's AnyEvent::RabbitMQ(我正在使用),我找到了 close
方法,它似乎关闭了所有向 RabbitMQ 打开的通道。
所以我
- 连接到 RabbitMQ
- 打开了一个频道
- 宣布交换
- 绑定到该交易所
- 声明了一个队列
- 绑定到那个队列
- 在
AnyEvent::RabbitMQ
实例(不是 ::Channel
实例)上执行 close
方法
连接似乎已关闭,但 RabbitMQ 日志显示 "AMQP connection" 是 "connection_closed_abruptly"。
这是该连接的完整 RabbitMQ 日志:
=INFO REPORT==== 14-Jan-2016::10:02:15 ===
accepting AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672)
=WARNING REPORT==== 14-Jan-2016::10:02:16 ===
closing AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672):
connection_closed_abruptly
示例代码如下:
#!/usr/bin/perl
use strictures 1;
use AnyEvent::RabbitMQ;
use Data::Printer;
my ( $rabbitmq, $rabbitmq_channel );
my $condvar = AnyEvent->condvar;
$rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => '127.0.0.1',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
tune => { heartbeat => 1 },
on_success => sub {
($rabbitmq) = @_;
$rabbitmq->open_channel(
on_success => sub {
($rabbitmq_channel) = @_;
$rabbitmq_channel->confirm;
$rabbitmq_channel->declare_exchange(
exchange => 'test_exchange',
type => 'fanout',
on_success => sub {
$rabbitmq_channel->bind_exchange(
source => 'test_exchange',
destination => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq_channel->declare_queue(
queue => 'test_queue',
on_success => sub {
$rabbitmq_channel->bind_queue(
queue => 'test_queue',
exchange => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq->close;
undef $rabbitmq;
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_read_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);
my $reason = [ $condvar->recv ];
p $reason;
如何使用 Perl AnyEvent::RabbitMQ
正确断开与 RabbitMQ 的连接?
有参考循环指标。这些可以防止结构被正确破坏。
"my $rabbitmq; $rabbitmq = ...
" 大喊引用循环的可能性。
"my $rabbitmq_channel; $rabbitmq_channel = ...
" 大喊引用循环的可能性。
$rabbitmq_channel
属于(存储在)$rabbitmq
,但它也被 $rabbitmq_channel
的事件处理程序捕获。
标记为 <===
的更改替换了不可接受的代码。
标记为 <---
的更改可能是必要的。如果回调中未定义 $rabbitmq_channel
,请删除此更改。
use Scalar::Util qw( weaken );
my $done_cv = AnyEvent->condvar;
my $rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect( # <===
host => '127.0.0.1',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
tune => { heartbeat => 1 },
on_success => sub {
my ($rabbitmq) = @_; # <===
$rabbitmq->open_channel(
on_success => sub {
my ($rabbitmq_channel) = @_; # <===
{ # <---
my $rabbitmq_channel = weaken($rabbitmq_channel); # <---
$rabbitmq_channel->confirm;
$rabbitmq_channel->declare_exchange(
exchange => 'test_exchange',
type => 'fanout',
on_success => sub {
$rabbitmq_channel->bind_exchange(
source => 'test_exchange',
destination => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq_channel->declare_queue(
queue => 'test_queue',
on_success => sub {
$rabbitmq_channel->bind_queue(
queue => 'test_queue',
exchange => 'test_exchange',
routing_key => '',
on_success => sub { $done_cv->send( __LINE__, @_ ) }, # <===
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
} # <---
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_return => sub { $done_cv->send( __LINE__, @_ ) },
on_close => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_read_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_return => sub { $done_cv->send( __LINE__, @_ ) },
on_close => sub { $done_cv->send( __LINE__, @_ ) },
);
my $reason = [ $done_cv->recv ];
p $reason;
希望对您有所帮助。
这个问题是 AnyEvent::RabbitMQ.pm 库本身的错误。我不确定如何修复 sub close
本身,但关键部分是它从不执行在全局解构期间将 Connection::Close
和 Connection::CloseOk
方法发送到 RabbitMQ 服务器的代码。您可以在设置 AMQP 连接后通过执行以下操作来确认。
$rabbitmq->_push_write(Net::AMQP::Protocol::Connection::Close->new());
$rabbitmq->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
这有点难,所以我正在研究正确的方法,希望维护者会接受拉取请求。
我想以正确的方式断开与 RabbitMQ 的连接。通过 reviewing the source code of Perl's AnyEvent::RabbitMQ(我正在使用),我找到了 close
方法,它似乎关闭了所有向 RabbitMQ 打开的通道。
所以我
- 连接到 RabbitMQ
- 打开了一个频道
- 宣布交换
- 绑定到该交易所
- 声明了一个队列
- 绑定到那个队列
- 在
AnyEvent::RabbitMQ
实例(不是::Channel
实例)上执行close
方法
连接似乎已关闭,但 RabbitMQ 日志显示 "AMQP connection" 是 "connection_closed_abruptly"。
这是该连接的完整 RabbitMQ 日志:
=INFO REPORT==== 14-Jan-2016::10:02:15 ===
accepting AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672)
=WARNING REPORT==== 14-Jan-2016::10:02:16 ===
closing AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672):
connection_closed_abruptly
示例代码如下:
#!/usr/bin/perl
use strictures 1;
use AnyEvent::RabbitMQ;
use Data::Printer;
my ( $rabbitmq, $rabbitmq_channel );
my $condvar = AnyEvent->condvar;
$rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => '127.0.0.1',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
tune => { heartbeat => 1 },
on_success => sub {
($rabbitmq) = @_;
$rabbitmq->open_channel(
on_success => sub {
($rabbitmq_channel) = @_;
$rabbitmq_channel->confirm;
$rabbitmq_channel->declare_exchange(
exchange => 'test_exchange',
type => 'fanout',
on_success => sub {
$rabbitmq_channel->bind_exchange(
source => 'test_exchange',
destination => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq_channel->declare_queue(
queue => 'test_queue',
on_success => sub {
$rabbitmq_channel->bind_queue(
queue => 'test_queue',
exchange => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq->close;
undef $rabbitmq;
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);
},
on_failure => sub { $condvar->send( __LINE__, @_ ) },
on_read_failure => sub { $condvar->send( __LINE__, @_ ) },
on_return => sub { $condvar->send( __LINE__, @_ ) },
on_close => sub { $condvar->send( __LINE__, @_ ) },
);
my $reason = [ $condvar->recv ];
p $reason;
如何使用 Perl AnyEvent::RabbitMQ
正确断开与 RabbitMQ 的连接?
有参考循环指标。这些可以防止结构被正确破坏。
"
my $rabbitmq; $rabbitmq = ...
" 大喊引用循环的可能性。"
my $rabbitmq_channel; $rabbitmq_channel = ...
" 大喊引用循环的可能性。$rabbitmq_channel
属于(存储在)$rabbitmq
,但它也被$rabbitmq_channel
的事件处理程序捕获。
标记为 <===
的更改替换了不可接受的代码。
标记为 <---
的更改可能是必要的。如果回调中未定义 $rabbitmq_channel
,请删除此更改。
use Scalar::Util qw( weaken );
my $done_cv = AnyEvent->condvar;
my $rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect( # <===
host => '127.0.0.1',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
tune => { heartbeat => 1 },
on_success => sub {
my ($rabbitmq) = @_; # <===
$rabbitmq->open_channel(
on_success => sub {
my ($rabbitmq_channel) = @_; # <===
{ # <---
my $rabbitmq_channel = weaken($rabbitmq_channel); # <---
$rabbitmq_channel->confirm;
$rabbitmq_channel->declare_exchange(
exchange => 'test_exchange',
type => 'fanout',
on_success => sub {
$rabbitmq_channel->bind_exchange(
source => 'test_exchange',
destination => 'test_exchange',
routing_key => '',
on_success => sub {
$rabbitmq_channel->declare_queue(
queue => 'test_queue',
on_success => sub {
$rabbitmq_channel->bind_queue(
queue => 'test_queue',
exchange => 'test_exchange',
routing_key => '',
on_success => sub { $done_cv->send( __LINE__, @_ ) }, # <===
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
);
} # <---
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_return => sub { $done_cv->send( __LINE__, @_ ) },
on_close => sub { $done_cv->send( __LINE__, @_ ) },
);
},
on_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_read_failure => sub { $done_cv->send( __LINE__, @_ ) },
on_return => sub { $done_cv->send( __LINE__, @_ ) },
on_close => sub { $done_cv->send( __LINE__, @_ ) },
);
my $reason = [ $done_cv->recv ];
p $reason;
希望对您有所帮助。
这个问题是 AnyEvent::RabbitMQ.pm 库本身的错误。我不确定如何修复 sub close
本身,但关键部分是它从不执行在全局解构期间将 Connection::Close
和 Connection::CloseOk
方法发送到 RabbitMQ 服务器的代码。您可以在设置 AMQP 连接后通过执行以下操作来确认。
$rabbitmq->_push_write(Net::AMQP::Protocol::Connection::Close->new());
$rabbitmq->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
这有点难,所以我正在研究正确的方法,希望维护者会接受拉取请求。