php rabbitmq 消费者重新连接
php rabbitmq consumer reconnect
我有一个使用 RabbitMQ 的 PHP 应用程序。为了冗余,我创建了一对 RabbitMQ 服务器并将它们加入到一个集群中。我还有一个 VyOS 故障转移集群 运行 HAProxy 用于负载平衡连接并在发生故障转移时提供重新路由。
昨天,我们的 VyOS 集群决定需要进行故障转移(可能是短暂的网络中断)。 HAproxy 在虚拟 IP 移动的一个 VyOS 上停止,并在另一个节点上重新启动 HAproxy。
在此之后,我查看了 Rabbit 中的队列,发现每个队列的消费者为零。我检查了机器 运行 消费者仍然有 PHP 运行。我离开他们一段时间,看看他们是否会重新连接,但他们没有。我不得不终止 PHP 脚本并重新启动它们,它们重新连接并立即开始使用。
我认为 RabbitMQ 和 HAproxy 正在按预期工作...现在我需要 PHP 消费者来支持故障转移事件...换句话说,它不仅需要挂起,还需要检测断开连接, 并自动重新连接。
这是我的 RabbitMQ class。提前感谢您的帮助!
<?php
while(true)
{
try{getMessages("transcode2");}
catch(Exception $e){echo($e->getMessage()."\n");}
sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
global $channel;
$msg=json_encode($msg);
$queue="transcode2";
$channel->queue_declare($queue,true,false,false,false);
$channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
global $connection,$channel;
$connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
$channel=$connection->channel();
$channel->queue_declare($queue,true,false,false,false);
$callback=function($msg)
{
if(handleMessage(json_decode($msg->body,true)))
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
else
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue,'',false,false,false,false,$callback);
while(count($channel->callbacks))
{
try{$channel->wait();}
catch(Exception $e)
{
break;
}
}
$channel->close();
$connection->close();
}
?>
如果您将超时参数用于 $channel->wait();
,它可能会起作用
零超时无法正常工作,正如您所说的代理可能会关闭连接并且 PHP 消费者没有发现它。
解决方案是不使用零接收超时。确保连接超时大于接收超时。
这是一个基于 AMQP Interop 的示例:
安装 AMQP Interop 兼容传输,例如:
composer require enqueue/amqp-bunny
该代码执行与您明确设置的超时相同的操作:
<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
$context = (new AmqpConnectionFactory(sprintf(
'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
)))->createContext();
$context->setQos(null,1,null);
//sendMessage
$queue = $context->createQueue("transcode2");
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($queue);
$message = $context->createMessage(json_encode($msg));
$message->setPriority($prio);
$producer = $context->createProducer();
$producer->send($queue, $message);
// getMessages
$consumer = $context->createConsumer($queue);
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
if(handleMessage(json_decode($message->getBody(), true))) {
$consumer->acknowledge($message);
} else {
$consumer->reject($message);
}
return true;
});
$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.
$context->consume($receiveTimeout);
我有一个使用 RabbitMQ 的 PHP 应用程序。为了冗余,我创建了一对 RabbitMQ 服务器并将它们加入到一个集群中。我还有一个 VyOS 故障转移集群 运行 HAProxy 用于负载平衡连接并在发生故障转移时提供重新路由。
昨天,我们的 VyOS 集群决定需要进行故障转移(可能是短暂的网络中断)。 HAproxy 在虚拟 IP 移动的一个 VyOS 上停止,并在另一个节点上重新启动 HAproxy。
在此之后,我查看了 Rabbit 中的队列,发现每个队列的消费者为零。我检查了机器 运行 消费者仍然有 PHP 运行。我离开他们一段时间,看看他们是否会重新连接,但他们没有。我不得不终止 PHP 脚本并重新启动它们,它们重新连接并立即开始使用。
我认为 RabbitMQ 和 HAproxy 正在按预期工作...现在我需要 PHP 消费者来支持故障转移事件...换句话说,它不仅需要挂起,还需要检测断开连接, 并自动重新连接。
这是我的 RabbitMQ class。提前感谢您的帮助!
<?php
while(true)
{
try{getMessages("transcode2");}
catch(Exception $e){echo($e->getMessage()."\n");}
sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
global $channel;
$msg=json_encode($msg);
$queue="transcode2";
$channel->queue_declare($queue,true,false,false,false);
$channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
global $connection,$channel;
$connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
$channel=$connection->channel();
$channel->queue_declare($queue,true,false,false,false);
$callback=function($msg)
{
if(handleMessage(json_decode($msg->body,true)))
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
else
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue,'',false,false,false,false,$callback);
while(count($channel->callbacks))
{
try{$channel->wait();}
catch(Exception $e)
{
break;
}
}
$channel->close();
$connection->close();
}
?>
如果您将超时参数用于 $channel->wait();
,它可能会起作用零超时无法正常工作,正如您所说的代理可能会关闭连接并且 PHP 消费者没有发现它。
解决方案是不使用零接收超时。确保连接超时大于接收超时。
这是一个基于 AMQP Interop 的示例:
安装 AMQP Interop 兼容传输,例如:
composer require enqueue/amqp-bunny
该代码执行与您明确设置的超时相同的操作:
<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
$context = (new AmqpConnectionFactory(sprintf(
'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
)))->createContext();
$context->setQos(null,1,null);
//sendMessage
$queue = $context->createQueue("transcode2");
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($queue);
$message = $context->createMessage(json_encode($msg));
$message->setPriority($prio);
$producer = $context->createProducer();
$producer->send($queue, $message);
// getMessages
$consumer = $context->createConsumer($queue);
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
if(handleMessage(json_decode($message->getBody(), true))) {
$consumer->acknowledge($message);
} else {
$consumer->reject($message);
}
return true;
});
$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.
$context->consume($receiveTimeout);