Laravel + Rabbit = Crontab vs Supervisor
Laravel + Rabbit = Crontab vs Supervisor
我有一个 php 应用程序 (laravel 5.8) 实现了 rabbitmq 消费者 (bschmitt/laravel-amqp)。
当应用程序启动时,为了控制消费者进程,监管进程也会启动。
我的主管 conf 文件:
[program:message-consume]
process_name=%(program_name)s_%(process_num)02d
command=php artisan message:consume
directory=/var/www
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/www/storage/logs/supervisor.log
stopwaitsecs=3600
startretries=20
消费命令
public function consume($routingKey)
{
Log::info("[MessagingService][consume] start consuming key: " . $ROUTING_KEY . ".*");
Amqp::consume($QUEUE_NAME, function ($message, $resolver) {
# my code...
$resolver->acknowledge($message);
}, [
'routing' => $ROUTING_KEY . '.*',
'queue' => $QUEUE_NAME,
'exchange' => $EXCHANGE,
'exchange_type' => 'topic',
'exchange_durable' => false,
'queue_force_declare' => true,
'queue_exclusive' => false,
'persistent' => true, // set true if consume forever
]);
}
一切似乎都很好,但我意识到,如果一段时间没有消息,应用程序将停止接收消息!
进程仍在运行,主管什么也没做。
这是怎么回事?我从 rabbit dashboard 看到的是没有消费者连接到该频道。
我们的想法是创建一个 cron 脚本,每分钟调用 rabbitmq api 来检查队列中是否有消费者,并在情况下重新启动主管。
我正在使用 docker,所以我更改了我的 startup.sh 以添加 cron
#!/bin/bash
supervisord
crontab /etc/cron.d/check-consumer-cron
cron -f
php-fpm
但我开始问我问题。我过得好吗?我错过了什么吗?我需要主管吗?
如果我按以下方式更改 startup.sh,会发生什么变化?
#!/bin/bash
php artisan message:consume
crontab /etc/cron.d/check-consumer-cron
cron -f
php-fpm
通过重新启动 Supervisor,您正在从错误的一端解决问题。主管的工作是保留您的脚本 运行,您需要了解为什么没有发生这种情况。
我相信你的问题实际上有两个部分:
- 如果闲置时间过长,您的消费者将断开连接。这可能是网络超时;在 AMQP 连接上配置“心跳”设置可能足以使其保持活动状态。
- Supervisor 未检测到此状态并重新启动您的使用者。那是因为它不监视您的消费者的网络状态,只监视 PHP 脚本是否仍然是 运行。您需要在消费者中使用一些代码来退出 PHP 而不是继续等待它永远不会收到的消息。
我不知道 Laravel 包装器,但通常消费者的工作方式是在某处设置无限循环,看起来有点像这样:
while ( true ) {
$connection->waitForAndProcessNextMessage();
}
每次有消息进来,或者超时过期,“waitForAndProcessNextMessage”方法(无论它实际调用什么)都会return,此时你可以决定是否继续循环,或停止并退出脚本。您可以在那里进行状态检查,或者只查看脚本 运行:
while ( true ) {
$connection->waitForAndProcessNextMessage();
if ( ! $connection->isStillConnected() ) {
break;
}
if ( minutesSinceConsumerStarted() > 30 ) {
break;
}
}
我想回答我的问题
我过得好吗?我错过了什么吗?
我使用的包是 php-amqplib 之上的包装器,这个包装器提供的文档有限,而且很糟糕,无法控制内部结构。当然,我无法调试和检查发生了什么。
直接使用 php-amqplib 并遵循他们提供的示例,我再也没有遇到过这种行为。
我需要主管吗?如果我按以下方式更改 startup.sh,会发生什么变化?
那不是重点。
我有一个 php 应用程序 (laravel 5.8) 实现了 rabbitmq 消费者 (bschmitt/laravel-amqp)。
当应用程序启动时,为了控制消费者进程,监管进程也会启动。
我的主管 conf 文件:
[program:message-consume]
process_name=%(program_name)s_%(process_num)02d
command=php artisan message:consume
directory=/var/www
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/www/storage/logs/supervisor.log
stopwaitsecs=3600
startretries=20
消费命令
public function consume($routingKey)
{
Log::info("[MessagingService][consume] start consuming key: " . $ROUTING_KEY . ".*");
Amqp::consume($QUEUE_NAME, function ($message, $resolver) {
# my code...
$resolver->acknowledge($message);
}, [
'routing' => $ROUTING_KEY . '.*',
'queue' => $QUEUE_NAME,
'exchange' => $EXCHANGE,
'exchange_type' => 'topic',
'exchange_durable' => false,
'queue_force_declare' => true,
'queue_exclusive' => false,
'persistent' => true, // set true if consume forever
]);
}
一切似乎都很好,但我意识到,如果一段时间没有消息,应用程序将停止接收消息!
进程仍在运行,主管什么也没做。
这是怎么回事?我从 rabbit dashboard 看到的是没有消费者连接到该频道。
我们的想法是创建一个 cron 脚本,每分钟调用 rabbitmq api 来检查队列中是否有消费者,并在情况下重新启动主管。
我正在使用 docker,所以我更改了我的 startup.sh 以添加 cron
#!/bin/bash
supervisord
crontab /etc/cron.d/check-consumer-cron
cron -f
php-fpm
但我开始问我问题。我过得好吗?我错过了什么吗?我需要主管吗?
如果我按以下方式更改 startup.sh,会发生什么变化?
#!/bin/bash
php artisan message:consume
crontab /etc/cron.d/check-consumer-cron
cron -f
php-fpm
通过重新启动 Supervisor,您正在从错误的一端解决问题。主管的工作是保留您的脚本 运行,您需要了解为什么没有发生这种情况。
我相信你的问题实际上有两个部分:
- 如果闲置时间过长,您的消费者将断开连接。这可能是网络超时;在 AMQP 连接上配置“心跳”设置可能足以使其保持活动状态。
- Supervisor 未检测到此状态并重新启动您的使用者。那是因为它不监视您的消费者的网络状态,只监视 PHP 脚本是否仍然是 运行。您需要在消费者中使用一些代码来退出 PHP 而不是继续等待它永远不会收到的消息。
我不知道 Laravel 包装器,但通常消费者的工作方式是在某处设置无限循环,看起来有点像这样:
while ( true ) {
$connection->waitForAndProcessNextMessage();
}
每次有消息进来,或者超时过期,“waitForAndProcessNextMessage”方法(无论它实际调用什么)都会return,此时你可以决定是否继续循环,或停止并退出脚本。您可以在那里进行状态检查,或者只查看脚本 运行:
while ( true ) {
$connection->waitForAndProcessNextMessage();
if ( ! $connection->isStillConnected() ) {
break;
}
if ( minutesSinceConsumerStarted() > 30 ) {
break;
}
}
我想回答我的问题
我过得好吗?我错过了什么吗?
我使用的包是 php-amqplib 之上的包装器,这个包装器提供的文档有限,而且很糟糕,无法控制内部结构。当然,我无法调试和检查发生了什么。
直接使用 php-amqplib 并遵循他们提供的示例,我再也没有遇到过这种行为。
我需要主管吗?如果我按以下方式更改 startup.sh,会发生什么变化?
那不是重点。