如何在 Perl 中处理任何事件、RabbitMQ(心跳)和长 运行 作业?
Howto deal with AnyEvent, RabbitMQ (heartbeat) and long running jobs in Perl?
我正在实现一个用于分布式 cronjob 执行的系统(所谓的 cron 计算集群 )。当行动时间到来时,Cronjobs 应该排队进入消息队列(RabbitMQ)。在另一端(集群的 nodes/workers)是一个 Perl 守护进程,它利用 AnyEvent::RabbitMQ
从消息队列中接收一个 cronjob/task/message,处理任务并请求另一个 [=58] =] 来自消息队列等等。
我使用通过 AnyEvent::RabbitMQ
实现的 RabbitMQ 心跳功能来帮助 RabbitMQ 识别断开的连接。
不用管心跳间隔的实际值!我也有很长的 运行 工作,需要几天时间。因此,将间隔设置为最长的 cronjob 将花费的时间不是一个选项。
请参阅以下代码片段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息使用 DoSing RabbitMQ。使用此方法是因为 RabbitMQ 的 consume
被禁止(由管理)。
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if ( not $amqp_method->{empty} ) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
);
return;
}
progress_job()
是解析消息和执行作业的地方。 pause_timer()
和 resume_timer()
控制触发 _timer_tick()
的 AnyEvent->timer
。
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
my ( $stdout, $stderr, $exit ) = capture {
system $job->{execute};
};
return;
}
第一个长 运行 作业进入,系统 "crashes" 出现各种错误消息。有时它抛出 'Unknown channel id: 1',有时它抛出 'Channel has already been closed'。所以我做了 'dumb debug'(试图弄乱配置)并发现当 heartbeat
间隔短于 progress_job()
内所用的时间时,将抛出这些错误。经过一番思考,这是有道理的。 progress_job()
是一个阻塞子例程,AnyEvent 无法继续将心跳包发送到 RabbitMQ。
我对解决 blocking-heatbeat 问题的第一个快速想法是在 child 进程中分叉并执行 progress_job()
。 AnyEvents documentation on FORK 指出当 child 内无法访问事件系统(例如通过 AnyEvent)时,使用 fork
是省事的方法。
接下来的想法是:好吧,没有事件系统的访问权限,所以我可以进行 fork。
但是:计时器应该在 progress_job()
返回后恢复 (resume_timer()
)。理论上 resume_timer()
会在 fork()
之后调用,而不是在 progress_job()
returns 之后调用。所以我停止了实施。
我的问题:最后一点怎么解决?如何在 progress_job()
之后 resume_timer()
(或者换句话说,分叉的 child)returns?
由于分叉和事件系统不是 thread-safe.
,我不能将 resume_timer()
放在 child 中
AE 无法处理事件,除非使用 AE-aware 调用阻止程序。 system
不是 AE-aware。请改用 AnyEvent::Util 中的 run_cmd
。
我正在实现一个用于分布式 cronjob 执行的系统(所谓的 cron 计算集群 )。当行动时间到来时,Cronjobs 应该排队进入消息队列(RabbitMQ)。在另一端(集群的 nodes/workers)是一个 Perl 守护进程,它利用 AnyEvent::RabbitMQ
从消息队列中接收一个 cronjob/task/message,处理任务并请求另一个 [=58] =] 来自消息队列等等。
我使用通过 AnyEvent::RabbitMQ
实现的 RabbitMQ 心跳功能来帮助 RabbitMQ 识别断开的连接。
不用管心跳间隔的实际值!我也有很长的 运行 工作,需要几天时间。因此,将间隔设置为最长的 cronjob 将花费的时间不是一个选项。
请参阅以下代码片段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息使用 DoSing RabbitMQ。使用此方法是因为 RabbitMQ 的 consume
被禁止(由管理)。
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if ( not $amqp_method->{empty} ) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
);
return;
}
progress_job()
是解析消息和执行作业的地方。 pause_timer()
和 resume_timer()
控制触发 _timer_tick()
的 AnyEvent->timer
。
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
my ( $stdout, $stderr, $exit ) = capture {
system $job->{execute};
};
return;
}
第一个长 运行 作业进入,系统 "crashes" 出现各种错误消息。有时它抛出 'Unknown channel id: 1',有时它抛出 'Channel has already been closed'。所以我做了 'dumb debug'(试图弄乱配置)并发现当 heartbeat
间隔短于 progress_job()
内所用的时间时,将抛出这些错误。经过一番思考,这是有道理的。 progress_job()
是一个阻塞子例程,AnyEvent 无法继续将心跳包发送到 RabbitMQ。
我对解决 blocking-heatbeat 问题的第一个快速想法是在 child 进程中分叉并执行 progress_job()
。 AnyEvents documentation on FORK 指出当 child 内无法访问事件系统(例如通过 AnyEvent)时,使用 fork
是省事的方法。
接下来的想法是:好吧,没有事件系统的访问权限,所以我可以进行 fork。
但是:计时器应该在 progress_job()
返回后恢复 (resume_timer()
)。理论上 resume_timer()
会在 fork()
之后调用,而不是在 progress_job()
returns 之后调用。所以我停止了实施。
我的问题:最后一点怎么解决?如何在 progress_job()
之后 resume_timer()
(或者换句话说,分叉的 child)returns?
由于分叉和事件系统不是 thread-safe.
resume_timer()
放在 child 中
AE 无法处理事件,除非使用 AE-aware 调用阻止程序。 system
不是 AE-aware。请改用 AnyEvent::Util 中的 run_cmd
。