在 PHP 中编写 RabbitMQ 的订阅方法
Writing a subscribe method of RabbitMQ in PHP
我有一个定义如下的函数:
public function subscribe($someQueue)
{
$callback = function($msg){
return $msg->body;
};
$this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
我正在使用以下功能:
注意:以下行位于不同的 class 文件中,因此创建了包含上述函数的 class 对象。
$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody;
基本上,我想 return 将每条消息的主体发送到发布到队列的调用函数。
当前输出:
message body returned from subscribe: NULL
预期输出:
holla, this is your message from queue
由于这个问题很老但仍然没有答案,我会做一个简短的解释。您现在可能已经找到了答案,但这可能会帮助其他人在未来进行搜索。
这里的关键概念是"asynchronous execution"。
当您使用 basic_consume
方法订阅频道时,您并不是要求立即执行一次回调,而是要求在消息可用时执行回调,然后每次都执行另一条消息可用。
在 AMQPLib 的情况下,您通过重复调用 wait()
方法来等待新消息;即这里:
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
仔细想想,你的代码有两个错误:
- 行
return $msg->body
无处可return到。该调用将在 wait()
方法实现的某个深处发生,并且您不会从 $this->channel->wait()
获得任何输出,因此无法对该 returned 值执行任何操作。
- 另一方面,当您从另一个 class 调用
$objRMQ->subscribe("someQueue")
时,您期望它是 return 某些东西,但该函数没有 return
语句.唯一的 return
语句在您传递给 basic_consume
. 的匿名函数中
解决方案基本上是所有你对消息的处理 - echo $msg->body
,或者你想做的任何实际处理 - 在回调。如果你真的想在消息传入时收集数据,你可以将它保存到回调外部可访问的某个变量,但请记住,你将在某些时候需要跳出 wait()
循环才能执行任何操作那个数据。
我有一个定义如下的函数:
public function subscribe($someQueue)
{
$callback = function($msg){
return $msg->body;
};
$this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
我正在使用以下功能:
注意:以下行位于不同的 class 文件中,因此创建了包含上述函数的 class 对象。
$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody;
基本上,我想 return 将每条消息的主体发送到发布到队列的调用函数。
当前输出:
message body returned from subscribe: NULL
预期输出:
holla, this is your message from queue
由于这个问题很老但仍然没有答案,我会做一个简短的解释。您现在可能已经找到了答案,但这可能会帮助其他人在未来进行搜索。
这里的关键概念是"asynchronous execution"。
当您使用 basic_consume
方法订阅频道时,您并不是要求立即执行一次回调,而是要求在消息可用时执行回调,然后每次都执行另一条消息可用。
在 AMQPLib 的情况下,您通过重复调用 wait()
方法来等待新消息;即这里:
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
仔细想想,你的代码有两个错误:
- 行
return $msg->body
无处可return到。该调用将在wait()
方法实现的某个深处发生,并且您不会从$this->channel->wait()
获得任何输出,因此无法对该 returned 值执行任何操作。 - 另一方面,当您从另一个 class 调用
$objRMQ->subscribe("someQueue")
时,您期望它是 return 某些东西,但该函数没有return
语句.唯一的return
语句在您传递给basic_consume
. 的匿名函数中
解决方案基本上是所有你对消息的处理 - echo $msg->body
,或者你想做的任何实际处理 - 在回调。如果你真的想在消息传入时收集数据,你可以将它保存到回调外部可访问的某个变量,但请记住,你将在某些时候需要跳出 wait()
循环才能执行任何操作那个数据。