使用 PhpAmqpLib 从 rabbitmq 消费者的回调中获取队列大小
Get queue size from rabbitmq consumer's callback with PhpAmqpLib
我想从工作人员的回调中记录工作状态,并在剩余队列中包含一些消息。
到目前为止我找到的唯一解决方案是获取 queue_declare
结果数组的第二个成员,但这应该在每次启动工作程序时调用一次,并且我需要在每条新消息中更新信息。
UPD:
基于的解决方案:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('test1');
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) use ($channel) {
list (, $cn) = $channel->queue_declare('test1', true);
echo ' [x] Received ', $msg->body, " $cn left";
for ($i = 0; $i < $msg->body; ++$i) {
sleep(1);
echo '.';
}
echo "\n";
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test1', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
出于某种原因,消息计数始终为 0。
queue_declare
方法有一个名为 "passive" 的参数,可用于此目的:它仅按名称检查队列是否存在,并忽略任何其他参数。
If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
请注意,Declare-Ok
不仅仅是一个状态,而是 the full response structure 的名称,具有字段 queue
、message-count
和 consumer-count
。
在 PHP-AMQPLib 中,您可以使用它来记录一组队列的状态,如下所示:
foreach ( $this->registeredQueues as $queueName ) {
// The second parameter to queue_declare is $passive
// When set to true, everything else is ignored, so need not be passed
list($queueName, $messageCount, $consumerCount)
= $this->rabbitChannel->queue_declare($queueName, true);
$this->logger->info(
"Queue $queueName has $messageCount messages and $consumerCount active consumers."
);
}
我想从工作人员的回调中记录工作状态,并在剩余队列中包含一些消息。
到目前为止我找到的唯一解决方案是获取 queue_declare
结果数组的第二个成员,但这应该在每次启动工作程序时调用一次,并且我需要在每条新消息中更新信息。
UPD:
基于
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('test1');
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) use ($channel) {
list (, $cn) = $channel->queue_declare('test1', true);
echo ' [x] Received ', $msg->body, " $cn left";
for ($i = 0; $i < $msg->body; ++$i) {
sleep(1);
echo '.';
}
echo "\n";
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test1', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
出于某种原因,消息计数始终为 0。
queue_declare
方法有一个名为 "passive" 的参数,可用于此目的:它仅按名称检查队列是否存在,并忽略任何其他参数。
If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
请注意,Declare-Ok
不仅仅是一个状态,而是 the full response structure 的名称,具有字段 queue
、message-count
和 consumer-count
。
在 PHP-AMQPLib 中,您可以使用它来记录一组队列的状态,如下所示:
foreach ( $this->registeredQueues as $queueName ) {
// The second parameter to queue_declare is $passive
// When set to true, everything else is ignored, so need not be passed
list($queueName, $messageCount, $consumerCount)
= $this->rabbitChannel->queue_declare($queueName, true);
$this->logger->info(
"Queue $queueName has $messageCount messages and $consumerCount active consumers."
);
}