使用 php-amqlib 从 RabbitMQ 读取消息不起作用
Reading messages from RabbitMQ with php-amqlib is not working
我正在与 PHP 和 RabbitMQ 进行客户端-服务器通信,使用 php-amqplib。
我有一个看起来工作正常的生产者脚本,但我的消费者没有收到任何东西。
我用 sudo rabbitmqctl list_queues
检查队列中的条目,每次我 运行 生产者后,计数器都会递增。
我的消费者启动时没有任何 PHP 错误,看起来像是在等待消息。
看起来不太好的是 运行s 回调脚本一次,在开始时有一次空传入消息 - 然后什么都不做。
php consumer.php
string(47) " [*] Waiting for messages. To exit press CTRL+C"
string(1) "
"
string(10) "Received: "
这是我的代码:
producer.php
public function sendDataToRabbitMQ()
{
$id = $_POST['id'];
$ipAddress = $_POST['ip'];
$date = date("Y-m-d h:i:s");
$status = false;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //host: RABBITMQ_HOST
$channel = $connection->channel();
$channel->queue_declare('first_queue', false, true, false, false);
if(is_array($argv)) {
$data = implode(' ', array_slice($argv, 1));
}
if (empty($data)) {
$data = "$ipAddress,$id";
}
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($msg, '', 'first_queue');
echo " [x] Sent data:", "\n", $data, "\n";
$channel->close();
$connection->close();
return $result;
}
(出于测试目的将主机名常量更改为 localhost)
consumer.php
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer
{
private $token;
private function getQueue()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('ban_queue', false, true, false, false);
var_dump(' [*] Waiting for messages. To exit press CTRL+C', "\n");
$callback = function($msg) {
$message = explode(',', $msg->body);
var_dump('Received: '.$message[0]);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('first_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
public function processResult()
{
$this->getQueue();
}
}
$consumer = new Consumer();
$consumer->processResult();
为什么不起作用?我发现 rabbitmq/php-amqplib 教程和文档非常无用,并且在半天多的时间里完全解决了这个问题。任何帮助表示赞赏。
更新 1
我也从这个站点检查了这个 ,我的代码与它一致。
经过一段时间的研究和测试,我找到了解决上述问题的方法:
我改了
$channel->basic_consume('first_queue', '', false, false, false, false, $callback);
至
$channel->basic_consume('first_queue', '', false, true, false, false, $callback);
成功了。
我正在与 PHP 和 RabbitMQ 进行客户端-服务器通信,使用 php-amqplib。
我有一个看起来工作正常的生产者脚本,但我的消费者没有收到任何东西。
我用 sudo rabbitmqctl list_queues
检查队列中的条目,每次我 运行 生产者后,计数器都会递增。
我的消费者启动时没有任何 PHP 错误,看起来像是在等待消息。 看起来不太好的是 运行s 回调脚本一次,在开始时有一次空传入消息 - 然后什么都不做。
php consumer.php
string(47) " [*] Waiting for messages. To exit press CTRL+C"
string(1) "
"
string(10) "Received: "
这是我的代码:
producer.php
public function sendDataToRabbitMQ()
{
$id = $_POST['id'];
$ipAddress = $_POST['ip'];
$date = date("Y-m-d h:i:s");
$status = false;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //host: RABBITMQ_HOST
$channel = $connection->channel();
$channel->queue_declare('first_queue', false, true, false, false);
if(is_array($argv)) {
$data = implode(' ', array_slice($argv, 1));
}
if (empty($data)) {
$data = "$ipAddress,$id";
}
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($msg, '', 'first_queue');
echo " [x] Sent data:", "\n", $data, "\n";
$channel->close();
$connection->close();
return $result;
}
(出于测试目的将主机名常量更改为 localhost)
consumer.php
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer
{
private $token;
private function getQueue()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('ban_queue', false, true, false, false);
var_dump(' [*] Waiting for messages. To exit press CTRL+C', "\n");
$callback = function($msg) {
$message = explode(',', $msg->body);
var_dump('Received: '.$message[0]);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('first_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
public function processResult()
{
$this->getQueue();
}
}
$consumer = new Consumer();
$consumer->processResult();
为什么不起作用?我发现 rabbitmq/php-amqplib 教程和文档非常无用,并且在半天多的时间里完全解决了这个问题。任何帮助表示赞赏。
更新 1
我也从这个站点检查了这个
经过一段时间的研究和测试,我找到了解决上述问题的方法:
我改了
$channel->basic_consume('first_queue', '', false, false, false, false, $callback);
至
$channel->basic_consume('first_queue', '', false, true, false, false, $callback);
成功了。