Laravel + Rabbit = Crontab vs Supervisor

Laravel + Rabbit = Crontab vs Supervisor

我有一个 php 应用程序 (laravel 5.8) 实现了 rabbitmq 消费者 (bschmitt/laravel-amqp)。
当应用程序启动时,为了控制消费者进程,监管进程也会启动。
我的主管 conf 文件:

[program:message-consume]
process_name=%(program_name)s_%(process_num)02d
command=php artisan message:consume 
directory=/var/www
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/www/storage/logs/supervisor.log
stopwaitsecs=3600
startretries=20

消费命令

public function consume($routingKey)
{
  Log::info("[MessagingService][consume] start consuming key: " . $ROUTING_KEY . ".*");
  Amqp::consume($QUEUE_NAME, function ($message, $resolver) {
    # my code...
    $resolver->acknowledge($message);
  }, [
      'routing' => $ROUTING_KEY . '.*',
      'queue' => $QUEUE_NAME,
      'exchange' => $EXCHANGE,
      'exchange_type' => 'topic',
      'exchange_durable' => false,
      'queue_force_declare' => true,
      'queue_exclusive' => false,
      'persistent' => true, // set true if consume forever
  ]);
}

一切似乎都很好,但我意识到,如果一段时间没有消息,应用程序将停止接收消息!

进程仍在运行,主管什么也没做。
这是怎么回事?我从 rabbit dashboard 看到的是没有消费者连接到该频道。

我们的想法是创建一个 cron 脚本,每分钟调用 rabbitmq api 来检查队列中是否有消费者,并在情况下重新启动主管。

我正在使用 docker,所以我更改了我的 startup.sh 以添加 cron

#!/bin/bash

supervisord

crontab /etc/cron.d/check-consumer-cron
cron -f

php-fpm

但我开始问我问题。我过得好吗?我错过了什么吗?我需要主管吗?
如果我按以下方式更改 startup.sh,会发生什么变化?

#!/bin/bash

php artisan message:consume 

crontab /etc/cron.d/check-consumer-cron
cron -f

php-fpm

通过重新启动 Supervisor,您正在从错误的一端解决问题。主管的工作是保留您的脚本 运行,您需要了解为什么没有发生这种情况。

我相信你的问题实际上有两个部分:

  1. 如果闲置时间过长,您的消费者将断开连接。这可能是网络超时;在 AMQP 连接上配置“心跳”设置可能足以使其保持活动状态。
  2. Supervisor 未检测到此状态并重新启动您的使用者。那是因为它不监视您的消费者的网络状态,只监视 PHP 脚本是否仍然是 运行。您需要在消费者中使用一些代码来退出 PHP 而不是继续等待它永远不会收到的消息。

我不知道 Laravel 包装器,但通常消费者的工作方式是在某处设置无限循环,看起来有点像这样:

while ( true ) {
   $connection->waitForAndProcessNextMessage();
}

每次有消息进来,或者超时过期,“waitForAndProcessNextMessage”方法(无论它实际调用什么)都会return,此时你可以决定是否继续循环,或停止并退出脚本。您可以在那里进行状态检查,或者只查看脚本 运行:

while ( true ) {
   $connection->waitForAndProcessNextMessage();
   if ( ! $connection->isStillConnected() ) {
      break;
   }
   if ( minutesSinceConsumerStarted() > 30 ) {
      break;
   }
}

我想回答我的问题

我过得好吗?我错过了什么吗?
我使用的包是 php-amqplib 之上的包装器,这个包装器提供的文档有限,而且很糟糕,无法控制内部结构。当然,我无法调试和检查发生了什么。 直接使用 php-amqplib 并遵循他们提供的示例,我再也没有遇到过这种行为。

我需要主管吗?如果我按以下方式更改 startup.sh,会发生什么变化?
那不是重点。