php rabbitmq 消费者重新连接

php rabbitmq consumer reconnect

我有一个使用 RabbitMQ 的 PHP 应用程序。为了冗余,我创建了一对 RabbitMQ 服务器并将它们加入到一个集群中。我还有一个 VyOS 故障转移集群 运行 HAProxy 用于负载平衡连接并在发生故障转移时提供重新路由。

昨天,我们的 VyOS 集群决定需要进行故障转移(可能是短暂的网络中断)。 HAproxy 在虚拟 IP 移动的一个 VyOS 上停止,并在另一个节点上重新启动 HAproxy。

在此之后,我查看了 Rabbit 中的队列,发现每个队列的消费者为零。我检查了机器 运行 消费者仍然有 PHP 运行。我离开他们一段时间,看看他们是否会重新连接,但他们没有。我不得不终止 PHP 脚本并重新启动它们,它们重新连接并立即开始使用。

我认为 RabbitMQ 和 HAproxy 正在按预期工作...现在我需要 PHP 消费者来支持故障转移事件...换句话说,它不仅需要挂起,还需要检测断开连接, 并自动重新连接。

这是我的 RabbitMQ class。提前感谢您的帮助!

<?php
while(true)
{
    try{getMessages("transcode2");}
    catch(Exception $e){echo($e->getMessage()."\n");}
    sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
    global $channel;
    $msg=json_encode($msg);
    $queue="transcode2";
    $channel->queue_declare($queue,true,false,false,false);
    $channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
    global $connection,$channel;
    $connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
    $channel=$connection->channel();
    $channel->queue_declare($queue,true,false,false,false);
    $callback=function($msg)
    {
        if(handleMessage(json_decode($msg->body,true)))
        {
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        else
        {
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
        }
    };
    $channel->basic_qos(null,1,null);
    $channel->basic_consume($queue,'',false,false,false,false,$callback);
    while(count($channel->callbacks))
    {
        try{$channel->wait();}
        catch(Exception $e)
        {
            break;
        }
    }
    $channel->close();
    $connection->close();
}
?>

如果您将超时参数用于 $channel->wait();

,它可能会起作用

零超时无法正常工作,正如您所说的代理可能会关闭连接并且 PHP 消费者没有发现它。

解决方案是不使用零接收超时。确保连接超时大于接收超时。

这是一个基于 AMQP Interop 的示例:

安装 AMQP Interop 兼容传输,例如:

composer require enqueue/amqp-bunny

该代码执行与您明确设置的超时相同的操作:

<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;

$context = (new AmqpConnectionFactory(sprintf(
    'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
    RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
)))->createContext();

$context->setQos(null,1,null);

//sendMessage

$queue = $context->createQueue("transcode2");
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($queue);

$message = $context->createMessage(json_encode($msg));
$message->setPriority($prio);

$producer = $context->createProducer();
$producer->send($queue, $message);

// getMessages

$consumer = $context->createConsumer($queue);
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
    if(handleMessage(json_decode($message->getBody(), true))) {
        $consumer->acknowledge($message);
    } else {
        $consumer->reject($message);
    }

    return true;
});

$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.

$context->consume($receiveTimeout);