Pheanstalk 两次或更多次保留相同的 beanstalkd 作业

Pheanstalk reserving same beanstalkd job twice or more

我有一个 3 个 beanstalkd 队列进程 运行 在同一个 ip 但不同的端口。我有一个单独的服务器 运行 主管并行生成 php 工作人员(每个 beanstalkd 端口 20 个)来处理队列。我的问题是两个进程似乎可以同时在同一台服务器上保留相同的作业 ID。

这是我的日志中的一些示例输出:

2017-02-23 09:59:56 --> START JOB (port: 11301 | u: 0.45138600 1487861996 | jid:1695074 | pid:30019 | j:leads_to_tags_add | tr:1)
2017-02-23 09:59:57 --> START JOB (port: 11301 | u: 0.55024800 1487861997 | jid:1695074 | pid:30157 | j:leads_to_tags_add | tr:2)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.54731000 1487861998 | jid:1695074 | pid:30019 | j:leads_to_tags_add)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.58927900 1487861998 | jid:1695074 | pid:30157 | j:leads_to_tags_add)

似乎这两个保留发生在彼此之后,第二个保留发生在第一个进程完成并删除作业之前。

我在 redis 中为每个 jobid 添加了一个计数器,很明显,当它第二次保留时,计数器上升了一次 (tr)。 TTRR 设置为 3600,因此它不可能在第一个进程完成之前过期。

这是第二次进程保留后的作业状态:

Pheanstalk\Response\ArrayResponse::__set_state(array(
   'id' => '1695074',
   'tube' => 'action-medium',
   'state' => 'reserved',
   'pri' => '0',
   'age' => '1',
   'delay' => '0',
   'ttr' => '3600',
   'time-left' => '3599',
   'file' => '385',
   'reserves' => '2',
   'timeouts' => '0',
   'releases' => '0',
   'buries' => '0',
   'kicks' => '0',
))

这种行为非常随机,有时只有一个进程能够保留直到作业锁定,有时是 2 个,有时甚至是 4 个或更多(很少见)。当然,这会导致正在执行的重复作业数量不一致。

代码的简短版本:

$this->job = $this->pheanstalk->watch($tube)->reserve($timeout);


set_error_handler(function($errno, $errstr, $errfile, $errline, array $errcontext) {
    // error was suppressed with the @-operator
    if (0 === error_reporting()) {
        return false;
    }

    throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
});

$this->log_message('info', __METHOD__ . ": START JOB (" . $this->_logDetails() . " | tr:{$tries})");


if ($this->_process_job()) {
    $this->log_message('info', __METHOD__ . ": FINISHED JOB (" . $this->_logDetails() . ")");
    $this->_delete_job();
} else {
    $this->log_message('error', __METHOD__ . ": FAILED JOB (" . $this->_logDetails() . ")");
}

restore_error_handler();

protected function _delete_job()
{
    $this->pheanstalk->delete($this->job);
    $this->log_message('info', __METHOD__ . ": DELETED JOB (" . $this->_logDetails() . ")");
}

问题是在我的代码的 process_job 位中发生了 TCP 断开连接。 pheanstalk 对象会在执行链的某个地方被覆盖。

我有一个包装 pheanstalk 的库,它在发送 put 命令之前检查可用的最空的 beanstalkd 服务器。然后它将使用最佳服务器的详细信息创建一个新的 Pheanstalk 实例,将其保存为当前连接,然后发送命令。

有时工作人员可以将子作业发送到队列。该库最初将加载到 worker 中以获取作业,然后再次加载到 process_job 中以发送到队列。由于 Codeigniter 中的依赖注入,该库将引用同一个对象,并且在 process_job 内部它会覆盖 worker 的当前连接并导致 TCP 断开连接。