AnyEvent::STOMP::Client + AnyEvent::ForkManger = 间歇性错误
AnyEvent::STOMP::Client + AnyEvent::ForkManger = Intermittent Error
我正在尝试编写一个进程来侦听 ActiveMQ 并基于消息,从 Web 服务中获取数据,进行一些处理,然后将进程数据放入另一个 Web 服务。 (REST/JSON)
下面的模块工作正常,直到我与 returns 交谈的一个不稳定的网络服务出现错误。我已经尝试了很多方法来捕获错误但无济于事。一旦发生网络服务错误,尽管我收到以下消息:
unhandled callback exception on event (MESSAGE,
AnyEvent::STOMP::Client=HASH(0x3ad5e48), HASH(0x3a6bbb0)
{"action":"created","data":{"id":40578737,"type":"alert","who":null},"guid":"ADCCEE0C-73A7-11E6-8084-74B346D1CA67","hostname":"myserver","pid":48632}):
$fork_manager->start() should be called within the manager process
好的,我从概念上理解子进程正在尝试启动另一个进程,而 fork 管理器说这是不行的。但是鉴于下面的模块,启动新进程来处理长 运行 处理的正确方法是什么。或者为什么一个子进程死了导致这个异常,我怎样才能防止这个
这是模块(精简版)
package consumer;
use AnyEvent::ForkManager;
use AnyEvent::STOMP::Client;
use JSON;
use Data::Dumper;
use v5.18;
use Moose;
sub run {
my $self = shift;
my $pm = AnyEvent::ForkManager->new(max_workers => 20);
my $stomp = AnyEvent::STOMP::Client->new();
$stomp->connect();
$stomp->on_connected(sub {
my $stomp = shift;
$stomp->subscribe('/topic/test');
say "Connected to STOMP";
});
$pm->on_start(sub {
my ($pm,$pid,@params) = @_;
say "Starting $pid worker";
});
$pm->on_finish(sub {
my ($pm, $pid,@params) = @_;
say "Finished $pid worker";
});
$pm->on_error(sub {
say Dumper(\@_);
});
$stomp->on_message(sub {
my ($stomp, $header, $body) = @_;
my $href = decode_json $body;
$pm->start(cb => sub {
my ($pm, @params) = @_;
$self->process(@params);
},
args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
);
});
my $cv = AnyEvent->condvar;
$cv->recv;
}
sub process {
say "Processing ".Dumper(\@_);
sleep 5;
if ( int(rand(10)) < 5 ) {
die "OOPS"; # this triggers the error message above
}
say "Done Processing $_[1]";
}
1;
以上模块的驱动程序如下:
#!/usr/bin/env perl
use v5.18;
use lib '.';
use consumer;
my $c = consumer->new();
$c->run;
最后一个流量生成器,您可以使用它来查看实际效果:
#!/usr/bin/env perl
use lib '../lib';
use lib '../../lib';
use v5.18;
use Data::Dumper;
use JSON;
use Net::STOMP::Client;
$ENV{'scot_mode'} = "testing";
my $stomp = Net::STOMP::Client->new(
host => "127.0.0.1",
port => 61613
);
$stomp->connect();
for (my $i = 1; $i < 1000000; $i++) {
my $href = {
id => $i,
type => "event",
what => "foo",
};
my $json = encode_json $href;
say "Sending ".Dumper($href);
$stomp->send(
destination => "/topic/test",
body => $json,
);
}
$stomp->disconnect();
我能够通过使用 Try::Catch 并用这样的 try catch 包装对 self->process 的调用来解决这个问题:
$stomp->on_message(sub {
my ($stomp, $header, $body) = @_;
my $href = decode_json $body;
$pm->start(cb => sub {
my ($pm, @params) = @_;
try {
$self->process(@params);
}
catch {
# error handling stuff
};
},
args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
);
}
);
我正在尝试编写一个进程来侦听 ActiveMQ 并基于消息,从 Web 服务中获取数据,进行一些处理,然后将进程数据放入另一个 Web 服务。 (REST/JSON)
下面的模块工作正常,直到我与 returns 交谈的一个不稳定的网络服务出现错误。我已经尝试了很多方法来捕获错误但无济于事。一旦发生网络服务错误,尽管我收到以下消息:
unhandled callback exception on event (MESSAGE, AnyEvent::STOMP::Client=HASH(0x3ad5e48), HASH(0x3a6bbb0) {"action":"created","data":{"id":40578737,"type":"alert","who":null},"guid":"ADCCEE0C-73A7-11E6-8084-74B346D1CA67","hostname":"myserver","pid":48632}): $fork_manager->start() should be called within the manager process
好的,我从概念上理解子进程正在尝试启动另一个进程,而 fork 管理器说这是不行的。但是鉴于下面的模块,启动新进程来处理长 运行 处理的正确方法是什么。或者为什么一个子进程死了导致这个异常,我怎样才能防止这个
这是模块(精简版)
package consumer;
use AnyEvent::ForkManager;
use AnyEvent::STOMP::Client;
use JSON;
use Data::Dumper;
use v5.18;
use Moose;
sub run {
my $self = shift;
my $pm = AnyEvent::ForkManager->new(max_workers => 20);
my $stomp = AnyEvent::STOMP::Client->new();
$stomp->connect();
$stomp->on_connected(sub {
my $stomp = shift;
$stomp->subscribe('/topic/test');
say "Connected to STOMP";
});
$pm->on_start(sub {
my ($pm,$pid,@params) = @_;
say "Starting $pid worker";
});
$pm->on_finish(sub {
my ($pm, $pid,@params) = @_;
say "Finished $pid worker";
});
$pm->on_error(sub {
say Dumper(\@_);
});
$stomp->on_message(sub {
my ($stomp, $header, $body) = @_;
my $href = decode_json $body;
$pm->start(cb => sub {
my ($pm, @params) = @_;
$self->process(@params);
},
args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
);
});
my $cv = AnyEvent->condvar;
$cv->recv;
}
sub process {
say "Processing ".Dumper(\@_);
sleep 5;
if ( int(rand(10)) < 5 ) {
die "OOPS"; # this triggers the error message above
}
say "Done Processing $_[1]";
}
1;
以上模块的驱动程序如下:
#!/usr/bin/env perl
use v5.18;
use lib '.';
use consumer;
my $c = consumer->new();
$c->run;
最后一个流量生成器,您可以使用它来查看实际效果:
#!/usr/bin/env perl
use lib '../lib';
use lib '../../lib';
use v5.18;
use Data::Dumper;
use JSON;
use Net::STOMP::Client;
$ENV{'scot_mode'} = "testing";
my $stomp = Net::STOMP::Client->new(
host => "127.0.0.1",
port => 61613
);
$stomp->connect();
for (my $i = 1; $i < 1000000; $i++) {
my $href = {
id => $i,
type => "event",
what => "foo",
};
my $json = encode_json $href;
say "Sending ".Dumper($href);
$stomp->send(
destination => "/topic/test",
body => $json,
);
}
$stomp->disconnect();
我能够通过使用 Try::Catch 并用这样的 try catch 包装对 self->process 的调用来解决这个问题:
$stomp->on_message(sub {
my ($stomp, $header, $body) = @_;
my $href = decode_json $body;
$pm->start(cb => sub {
my ($pm, @params) = @_;
try {
$self->process(@params);
}
catch {
# error handling stuff
};
},
args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
);
}
);