RabbitMQ - 如何发送仅由一组消费者中的一个和该组外的所有消费者处理的扇出样式消息
RabbitMQ - How do I send fanout style message that is processed by only one of a group of consumers and all consumers outside the group
我对 rabbitMQ 还很陌生,想知道如何设置系统以最适合我的需要。
假设我有 5 个消费者进程 (C1-5),而 C1-3 只需要其中 一个 来使用该消息。 C4 和 C5 也 需要接收消息。
如何配置 rabbitMQ 来实现这一点?
我考虑过在 C1-3 前面弹出另一个消费者以简单地推送到标准队列,C1-3 将从中消费,但我想知道这是否是额外的工作,rabbitMQ 是否有更好的方法来解决这个问题?
非常感谢任何建议。
亲切的问候,
很棒
这很容易做到,只需将 C1-C3 附加到同一个队列即可。不需要额外的 consumer/publisher 路线。
示例:
(使用 PHP 和 videlalvaro/php-amqplib。)
send.php
声明扇出交换 "events" 并向其发布消息。
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Something happened!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange);
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
pooledreceive.php (C1-C3)
附加到命名的 "pool" 队列。消息将在接收者之间分发。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$queue = 'pool';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$channel->queue_declare($queue);
$channel->queue_bind($queue, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
receive.php (C4, C5)
这些接收者中的每一个都有自己的队列和自己的消息副本。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare('');
$channel->queue_bind($queue_name, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
我对 rabbitMQ 还很陌生,想知道如何设置系统以最适合我的需要。
假设我有 5 个消费者进程 (C1-5),而 C1-3 只需要其中 一个 来使用该消息。 C4 和 C5 也 需要接收消息。
如何配置 rabbitMQ 来实现这一点?
非常感谢任何建议。
亲切的问候, 很棒
这很容易做到,只需将 C1-C3 附加到同一个队列即可。不需要额外的 consumer/publisher 路线。
示例:
(使用 PHP 和 videlalvaro/php-amqplib。)
send.php
声明扇出交换 "events" 并向其发布消息。
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Something happened!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange);
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
pooledreceive.php (C1-C3)
附加到命名的 "pool" 队列。消息将在接收者之间分发。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$queue = 'pool';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
$channel->queue_declare($queue);
$channel->queue_bind($queue, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
receive.php (C4, C5)
这些接收者中的每一个都有自己的队列和自己的消息副本。
use PhpAmqpLib\Connection\AMQPConnection;
$exchange = 'events';
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare('');
$channel->queue_bind($queue_name, $exchange);
echo ' [*] Waiting for events. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();