RabbitMQ 收到消息但无处可去
RabbitMQ receives message but it goes nowhere
我在 PuPHPet 生成的虚拟框 (Ubuntu 16.04 x64) 上安装了 RabbitMQ,位于 Windows 10.
设置完成后,我使用rabbitmqctl
配置了一个新用户:
# rabbitmqctl add_user root root
# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# rabbitmqctl set_user_tags root administrator
在 PHP & RabbitMQ Tutorial 之后,我设置了发送方和接收方脚本。
发件人脚本如下:
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$channel->queue_declare('my_queue', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo "Sent 'Hello World!'\n";
$channel->close();
$connection->close();
接收脚本如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$channel->queue_declare('my_queue', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('my_queue', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
我为接收方脚本打开了一个屏幕,并触发了几次发送方脚本的执行:
$ php sender.php
Sent 'Hello World!'
发送方脚本没有运行任何错误(而且我能够验证队列是在 rabbit 中声明的),但是接收方没有输出它已经接收/消费了任何消息。
此外,使用管理器插件进行的快速检查显示队列中根本没有消息:
$ curl -i -u root:root http://localhost:15672/api/queues
...
{
"messages_details":{
"rate":0.0
},
"messages":0,
"messages_unacknowledged_details":{
"rate":0.0
},
"messages_unacknowledged":0,
"messages_ready_details":{
"rate":0.0
},
"messages_ready":0,
"reductions_details":{
"rate":0.0
},
"reductions":9294,
"node":"rabbit@leon",
"arguments":{
},
"exclusive":false,
"auto_delete":false,
"durable":false,
"vhost":"/",
"name":"my_queue",
"message_bytes_paged_out":0,
"messages_paged_out":0,
"backing_queue_status":{
"avg_ack_egress_rate":0.0,
"avg_ack_ingress_rate":0.0,
"avg_egress_rate":0.0,
"avg_ingress_rate":0.0,
"delta":[
"delta",
"undefined",
0,
0,
"undefined"
],
"len":0,
"mode":"default",
"next_seq_id":0,
"q1":0,
"q2":0,
"q3":0,
"q4":0,
"target_ram_count":"infinity"
},
"head_message_timestamp":null,
"message_bytes_persistent":0,
"message_bytes_ram":0,
"message_bytes_unacknowledged":0,
"message_bytes_ready":0,
"message_bytes":0,
"messages_persistent":0,
"messages_unacknowledged_ram":0,
"messages_ready_ram":0,
"messages_ram":0,
"garbage_collection":{
"minor_gcs":12,
"fullsweep_after":65535,
"min_heap_size":233,
"min_bin_vheap_size":46422,
"max_heap_size":0
},
"state":"running",
"recoverable_slaves":null,
"consumers":0,
"exclusive_consumer_tag":null,
"effective_policy_definition":[
],
"operator_policy":null,
"policy":null,
"consumer_utilisation":null,
"idle_since":"2018-01-28 15:21:22",
"memory":9640
},
...
消息似乎已被接受,但立即被丢弃且未记录。说到日志,rabbit 日志中也没有任何可疑之处。只是很多这样的:
2018-01-28 15:47:43.654 [info] <0.1417.0> accepting AMQP connection <0.1417.0> ([::1]:49058 -> [::1]:5672)
2018-01-28 15:47:43.696 [info] <0.1417.0> connection <0.1417.0> ([::1]:49058 -> [::1]:5672): user 'root' authenticated and granted access to vhost '/'
2018-01-28 15:47:43.742 [info] <0.1417.0> closing AMQP connection <0.1417.0> ([::1]:49058 -> [::1]:5672, vhost: '/', user: 'root')
为什么消息没有通过?
RabbitMQ 团队监控 this mailing list 并且有时只在 Whosebug 上回答问题。
由于您没有将 my_queue
绑定到任何交换,您必须使用 my_queue
作为路由键发布到默认交换。所有队列都使用队列名称作为路由键绑定到默认主题交换。
在您的代码中,您使用 hello
作为路由键。
除了您将消息发送到错误的队列之外,您的代码看起来非常完美。当您将消息发送到不存在的队列时,RabbitMQ 将简单地丢弃该消息,因为它不知道将它发送到哪里。
在您的代码中,您使用默认交换来发送消息。即:
$channel->basic_publish($msg, '', 'hello');
Here we use the default or nameless exchange: messages are routed to
the queue with the name specified by routing_key, if it exists. The
routing key is the third argument to basic_publish
所以当你使用default exchange
时,你必须指定routing key
作为第三个参数,这是你的队列名称。当您使用默认交换时,RabbitMQ 会创建与队列同名的路由键。
要修复您的代码,只需将 hello
更改为您的队列名称,即 my_queue
,它将开始发送和接收。
希望对您有所帮助:)
我在 PuPHPet 生成的虚拟框 (Ubuntu 16.04 x64) 上安装了 RabbitMQ,位于 Windows 10.
设置完成后,我使用rabbitmqctl
配置了一个新用户:
# rabbitmqctl add_user root root
# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# rabbitmqctl set_user_tags root administrator
在 PHP & RabbitMQ Tutorial 之后,我设置了发送方和接收方脚本。
发件人脚本如下:
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$channel->queue_declare('my_queue', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo "Sent 'Hello World!'\n";
$channel->close();
$connection->close();
接收脚本如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$channel->queue_declare('my_queue', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('my_queue', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
我为接收方脚本打开了一个屏幕,并触发了几次发送方脚本的执行:
$ php sender.php
Sent 'Hello World!'
发送方脚本没有运行任何错误(而且我能够验证队列是在 rabbit 中声明的),但是接收方没有输出它已经接收/消费了任何消息。
此外,使用管理器插件进行的快速检查显示队列中根本没有消息:
$ curl -i -u root:root http://localhost:15672/api/queues
...
{
"messages_details":{
"rate":0.0
},
"messages":0,
"messages_unacknowledged_details":{
"rate":0.0
},
"messages_unacknowledged":0,
"messages_ready_details":{
"rate":0.0
},
"messages_ready":0,
"reductions_details":{
"rate":0.0
},
"reductions":9294,
"node":"rabbit@leon",
"arguments":{
},
"exclusive":false,
"auto_delete":false,
"durable":false,
"vhost":"/",
"name":"my_queue",
"message_bytes_paged_out":0,
"messages_paged_out":0,
"backing_queue_status":{
"avg_ack_egress_rate":0.0,
"avg_ack_ingress_rate":0.0,
"avg_egress_rate":0.0,
"avg_ingress_rate":0.0,
"delta":[
"delta",
"undefined",
0,
0,
"undefined"
],
"len":0,
"mode":"default",
"next_seq_id":0,
"q1":0,
"q2":0,
"q3":0,
"q4":0,
"target_ram_count":"infinity"
},
"head_message_timestamp":null,
"message_bytes_persistent":0,
"message_bytes_ram":0,
"message_bytes_unacknowledged":0,
"message_bytes_ready":0,
"message_bytes":0,
"messages_persistent":0,
"messages_unacknowledged_ram":0,
"messages_ready_ram":0,
"messages_ram":0,
"garbage_collection":{
"minor_gcs":12,
"fullsweep_after":65535,
"min_heap_size":233,
"min_bin_vheap_size":46422,
"max_heap_size":0
},
"state":"running",
"recoverable_slaves":null,
"consumers":0,
"exclusive_consumer_tag":null,
"effective_policy_definition":[
],
"operator_policy":null,
"policy":null,
"consumer_utilisation":null,
"idle_since":"2018-01-28 15:21:22",
"memory":9640
},
...
消息似乎已被接受,但立即被丢弃且未记录。说到日志,rabbit 日志中也没有任何可疑之处。只是很多这样的:
2018-01-28 15:47:43.654 [info] <0.1417.0> accepting AMQP connection <0.1417.0> ([::1]:49058 -> [::1]:5672)
2018-01-28 15:47:43.696 [info] <0.1417.0> connection <0.1417.0> ([::1]:49058 -> [::1]:5672): user 'root' authenticated and granted access to vhost '/'
2018-01-28 15:47:43.742 [info] <0.1417.0> closing AMQP connection <0.1417.0> ([::1]:49058 -> [::1]:5672, vhost: '/', user: 'root')
为什么消息没有通过?
RabbitMQ 团队监控 this mailing list 并且有时只在 Whosebug 上回答问题。
由于您没有将 my_queue
绑定到任何交换,您必须使用 my_queue
作为路由键发布到默认交换。所有队列都使用队列名称作为路由键绑定到默认主题交换。
在您的代码中,您使用 hello
作为路由键。
除了您将消息发送到错误的队列之外,您的代码看起来非常完美。当您将消息发送到不存在的队列时,RabbitMQ 将简单地丢弃该消息,因为它不知道将它发送到哪里。
在您的代码中,您使用默认交换来发送消息。即:
$channel->basic_publish($msg, '', 'hello');
Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the third argument to basic_publish
所以当你使用default exchange
时,你必须指定routing key
作为第三个参数,这是你的队列名称。当您使用默认交换时,RabbitMQ 会创建与队列同名的路由键。
要修复您的代码,只需将 hello
更改为您的队列名称,即 my_queue
,它将开始发送和接收。
希望对您有所帮助:)