在 PHP 中编写 RabbitMQ 的订阅方法

Writing a subscribe method of RabbitMQ in PHP

我有一个定义如下的函数:

public function subscribe($someQueue)
{
    $callback = function($msg){
        return $msg->body;
    };
    $this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
    while(count($this->channel->callbacks)) {
         $this->channel->wait();
    }
}

我正在使用以下功能:

注意:以下行位于不同的 class 文件中,因此创建了包含上述函数的 class 对象。

$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody; 

基本上,我想 return 将每条消息的主体发送到发布到队列的调用函数。

当前输出:

message body returned from subscribe: NULL

预期输出:

holla, this is your message from queue

由于这个问题很老但仍然没有答案,我会做一个简短的解释。您现在可能已经找到了答案,但这可能会帮助其他人在未来进行搜索。

这里的关键概念是"asynchronous execution"。

当您使用 basic_consume 方法订阅频道时,您并不是要求立即执行一次回调,而是要求在消息可用时执行回调,然后每次都执行另一条消息可用。

在 AMQPLib 的情况下,您通过重复调用 wait() 方法来等待新消息;即这里:

while(count($this->channel->callbacks)) {
     $this->channel->wait();
}

仔细想想,你的代码有两个错误:

  • return $msg->body无处可return。该调用将在 wait() 方法实现的某个深处发生,并且您不会从 $this->channel->wait() 获得任何输出,因此无法对该 returned 值执行任何操作。
  • 另一方面,当您从另一个 class 调用 $objRMQ->subscribe("someQueue") 时,您期望它是 return 某些东西,但该函数没有 return 语句.唯一的 return 语句在您传递给 basic_consume.
  • 的匿名函数中

解决方案基本上是所有你对消息的处理 - echo $msg->body,或者你想做的任何实际处理 - 在回调。如果你真的想在消息传入时收集数据,你可以将它保存到回调外部可访问的某个变量,但请记住,你将在某些时候需要跳出 wait() 循环才能执行任何操作那个数据。