如何使用 php-amqplib 从消费者回调关闭 rabbitmq 连接
How to close rabbitmq connections from consumer callback using php-amqplib
我已经能够使用 php-amqplib 库设置连接到 RabbitMQ 的脚本。但是,如果需要,我希望能够退出脚本。
我们认为创建 stop.txt 文件可以解决问题。基本上,一旦收到一条消息,如果脚本看到该文件,它必须停止接收任何进一步的消息,删除该文件并退出脚本。
然而,虽然它正在删除文件,但它不会阻止接收更多消息,也不会退出脚本。
如您所见,我使用与此官方示例完全相同的函数来退出脚本:https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer.php。参见第 54 行。
我是做错了什么还是没理解什么?我是这个图书馆的新手。
这是我正在使用的脚本。为了简洁起见,我省略了部分内容。
function process_message($message)
{
// ... do some processing ... //
// ... insert data into DB ... //
// send an acknowledgement back to the producer: message successfully received
$message->ack();
// if a stop file exists then exit the script
if (file_exists("./stop.txt")) {
$message->getChannel()->basic_cancel($message->getConsumerTag());
unlink("./stop.txt"); // delete stop file
}
}
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
$connection = new AMQPStreamConnection(HOST, PORT, USERNAME, PASSWORD, VHOST);
$channel = $connection->channel();
$channel->queue_declare(QUEUE, false, true, false, false);
// make sure only one message is sent at a time
$channel->basic_qos(null, 1, null);
// consume messages from the queue
$channel->basic_consume(QUEUE, 'consumer', false, false, false, false, 'process_message');
register_shutdown_function('shutdown', $channel, $connection);
// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {
$channel->wait();
}
提前致谢,
理查德
这似乎关闭了一切。我不知道这是否是最好的解决方案。我基本上是从回调中关闭通道和连接。
// if a stop file exists then exit the script
if (file_exists("./stop.txt")) {
$message->getChannel()->basic_cancel($message->getConsumerTag());
// NEW CODE //
$channel = $message->getChannel();
$connection = $message->getChannel()->getConnection();
$channel->close();
$connection->close();
// END NEW CODE //
unlink("./stop.txt"); // delete stop file
}
我已经能够使用 php-amqplib 库设置连接到 RabbitMQ 的脚本。但是,如果需要,我希望能够退出脚本。
我们认为创建 stop.txt 文件可以解决问题。基本上,一旦收到一条消息,如果脚本看到该文件,它必须停止接收任何进一步的消息,删除该文件并退出脚本。
然而,虽然它正在删除文件,但它不会阻止接收更多消息,也不会退出脚本。
如您所见,我使用与此官方示例完全相同的函数来退出脚本:https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer.php。参见第 54 行。
我是做错了什么还是没理解什么?我是这个图书馆的新手。
这是我正在使用的脚本。为了简洁起见,我省略了部分内容。
function process_message($message)
{
// ... do some processing ... //
// ... insert data into DB ... //
// send an acknowledgement back to the producer: message successfully received
$message->ack();
// if a stop file exists then exit the script
if (file_exists("./stop.txt")) {
$message->getChannel()->basic_cancel($message->getConsumerTag());
unlink("./stop.txt"); // delete stop file
}
}
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
$connection = new AMQPStreamConnection(HOST, PORT, USERNAME, PASSWORD, VHOST);
$channel = $connection->channel();
$channel->queue_declare(QUEUE, false, true, false, false);
// make sure only one message is sent at a time
$channel->basic_qos(null, 1, null);
// consume messages from the queue
$channel->basic_consume(QUEUE, 'consumer', false, false, false, false, 'process_message');
register_shutdown_function('shutdown', $channel, $connection);
// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {
$channel->wait();
}
提前致谢,
理查德
这似乎关闭了一切。我不知道这是否是最好的解决方案。我基本上是从回调中关闭通道和连接。
// if a stop file exists then exit the script
if (file_exists("./stop.txt")) {
$message->getChannel()->basic_cancel($message->getConsumerTag());
// NEW CODE //
$channel = $message->getChannel();
$connection = $message->getChannel()->getConnection();
$channel->close();
$connection->close();
// END NEW CODE //
unlink("./stop.txt"); // delete stop file
}