rabbitmq 和 php - 用一个工人(经纪人)处理多个队列
rabbitmq and php - Process multiple queues with one worker (broker)
我有 1000 个具有特定名称的队列。所以我想用一个经纪人处理这些队列。可能吗?
队列名称存储在 mysql 数据库中,因此我应该为每个队列获取主题和 运行 代理。当然它应该 运行 异步并且应该能够将排队的项目传递给空闲代理。这可能吗?或者我应该制作 1000 个具有特定队列名称的文件作为代理?
更新:
这是我排队的照片。队列应该 运行 以并行方式而不是串行方式。所以用户是生产者,工人是消费者 运行s send_message()
方法;
我可以向您展示如何使用 enqueue 库。我必须警告你,没有办法在一个进程中异步消费消息。尽管您可以 运行 一些服务于一组队列的进程。它们可以按队列重要性分组。
安装 AMQP 传输和消费库:
composer require enqueue/amqp-ext enqueue/enqueue
创建消费脚本。我假设您已经从数据库中获取了一组队列名称。它们存储在 $queueNames
变量中。该示例将同一处理器绑定到所有队列,但您当然可以设置不同的处理器。
<?php
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];
$factory = new AmqpConnectionFactory('amqp://');
$context = $factory->createContext();
// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
$queue = $context->createQueue($queueName);
$queue->addFlag(AMQP_DURABLE);
$context->declareQueue($queue);
}
$consumer = new QueueConsumer($context);
foreach ($queueNames as $queueName) {
$consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
echo 'Consume the message from queue: '.$queueName;
// your processing logic.
return PsrProcessor::ACK;
});
}
$consumer->consume();
中的更多内容
我有 1000 个具有特定名称的队列。所以我想用一个经纪人处理这些队列。可能吗?
队列名称存储在 mysql 数据库中,因此我应该为每个队列获取主题和 运行 代理。当然它应该 运行 异步并且应该能够将排队的项目传递给空闲代理。这可能吗?或者我应该制作 1000 个具有特定队列名称的文件作为代理?
更新:
这是我排队的照片。队列应该 运行 以并行方式而不是串行方式。所以用户是生产者,工人是消费者 运行s send_message()
方法;
我可以向您展示如何使用 enqueue 库。我必须警告你,没有办法在一个进程中异步消费消息。尽管您可以 运行 一些服务于一组队列的进程。它们可以按队列重要性分组。
安装 AMQP 传输和消费库:
composer require enqueue/amqp-ext enqueue/enqueue
创建消费脚本。我假设您已经从数据库中获取了一组队列名称。它们存储在 $queueNames
变量中。该示例将同一处理器绑定到所有队列,但您当然可以设置不同的处理器。
<?php
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];
$factory = new AmqpConnectionFactory('amqp://');
$context = $factory->createContext();
// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
$queue = $context->createQueue($queueName);
$queue->addFlag(AMQP_DURABLE);
$context->declareQueue($queue);
}
$consumer = new QueueConsumer($context);
foreach ($queueNames as $queueName) {
$consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
echo 'Consume the message from queue: '.$queueName;
// your processing logic.
return PsrProcessor::ACK;
});
}
$consumer->consume();
中的更多内容