阻止消费者进入空队列

Do to block consumer on empty queue

细节

我有使用 PHP 编写的消费者,它尝试使用消息。我的目标很简单——如果队列中没有消息,释放执行并继续,考虑到有 "no data retrieved".

目前的想法

我尝试了 AMQP_NOWAIT 标记,例如:

$flag = AMQP_NOWAIT;
$this->queue->consume($callbackFunction, $flag, $this->consumerTag);

它没有用。到目前为止,我有解决方法,比如 - 我声明 \AMQPConnection 的连接超时,比方说,5 秒,然后以这种方式捕获它:

try {
    $this->consumer->consume($this->consumer->getReadMessageCallback($notifications, $requeue));
} catch (\AMQPConnectionException $connectionException) {
    //based on timeouts. Are there other ways to interrupt empty queue consuming? AMQP_NOWAIT fails, does nothing:
    return [];
}

但是,这是一种非常 "hacky" 的方式。它对我有用,但是:

接下来 - 我尝试 AMQP_IFEMPTY | AMQP_PASSIVE 创建队列。问题是 - 如果那里没有消息,它将删除队列,并且它会在尝试从那里获取消息时引发异常(我可能会捕获)。但随后出现了一个问题,例如 - 队列立即被删除,我什至无法在那里添加消息。

问题

从空队列中读取消息确实是一个常见问题,因此我相信这应该是一种妥善解决的方法。因此,我该怎么做?

是的,手动链接是 /pl/,因为没有 "en" 链接。但它或多或少是可读的,因为它在任何情况下都是英文的。

如果你需要判断队列是否为空,你可以调用AMQPQueue::declare(),它是幂等的,结果returns条消息在队列中计数。请注意,该数字不是很准确(参见 why)。

此外,您只需调用 AMQPQueue::get()(就像在管理工具中完成的一样)。

之后,正如您也尝试过的那样,您可以将 AMQPConnection::setReadTimeout() 设置为某个较低的值(在本地网络中 1 秒可能就足够了),然后调用 AMQPQueue::consume() 并捕获超时异常如果消费者等待时间过长。

关于不良文档,请参阅此问题的答案:Where can i find the php-amqp documentation