如何在 Perl 中处理任何事件、RabbitMQ(心跳)和长 运行 作业?

Howto deal with AnyEvent, RabbitMQ (heartbeat) and long running jobs in Perl?

我正在实现一个用于分布式 cronjob 执行的系统(所谓的 cron 计算集群 )。当行动时间到来时,Cronjobs 应该排队进入消息队列(RabbitMQ)。在另一端(集群的 nodes/workers)是一个 Perl 守护进程,它利用 AnyEvent::RabbitMQ 从消息队列中接收一个 cronjob/task/message,处理任务并请求另一个 [=58] =] 来自消息队列等等。

我使用通过 AnyEvent::RabbitMQ 实现的 RabbitMQ 心跳功能来帮助 RabbitMQ 识别断开的连接。

不用管心跳间隔的实际值!我也有很长的 运行 工作,需要几天时间。因此,将间隔设置为最长的 cronjob 将花费的时间不是一个选项。

请参阅以下代码片段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息使用 DoSing RabbitMQ。使用此方法是因为 RabbitMQ 的 consume 被禁止(由管理)。

sub _timer_tick {

  $rabbitmq_channel->get(
    queue      => 'job_queue',
    on_success => sub {
      my ($amqp_method) = @_;
      if ( not $amqp_method->{empty} ) {
        pause_timer();
        progress_job($amqp_method);
        resume_timer();
      }
    },
    on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
  );

  return;
}

progress_job() 是解析消息和执行作业的地方。 pause_timer()resume_timer() 控制触发 _timer_tick()AnyEvent->timer

use Capture::Tiny 'capture';
sub progress_job {
  my ($amqp_method) = @_;
  my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
  my ( $stdout, $stderr, $exit ) = capture {
    system $job->{execute};
  };
  return;
}

第一个长 运行 作业进入,系统 "crashes" 出现各种错误消息。有时它抛出 'Unknown channel id: 1',有时它抛出 'Channel has already been closed'。所以我做了 'dumb debug'(试图弄乱配置)并发现当 heartbeat 间隔短于 progress_job() 内所用的时间时,将抛出这些错误。经过一番思考,这是有道理的。 progress_job() 是一个阻塞子例程,AnyEvent 无法继续将心跳包发送到 RabbitMQ。

我对解决 blocking-heatbeat 问题的第一个快速想法是在 child 进程中分叉并执行 progress_job()AnyEvents documentation on FORK 指出当 child 内无法访问事件系统(例如通过 AnyEvent)时,使用 fork 是省事的方法。 接下来的想法是:好吧,没有事件系统的访问权限,所以我可以进行 fork。 但是:计时器应该在 progress_job() 返回后恢复 (resume_timer())。理论上 resume_timer() 会在 fork() 之后调用,而不是在 progress_job() returns 之后调用。所以我停止了实施。

我的问题:最后一点怎么解决?如何在 progress_job() 之后 resume_timer()(或者换句话说,分叉的 child)returns? 由于分叉和事件系统不是 thread-safe.

,我不能将 resume_timer() 放在 child 中

AE 无法处理事件,除非使用 AE-aware 调用阻止程序。 system 不是 AE-aware。请改用 AnyEvent::Util 中的 run_cmd