如何使用来自 PHP 的 RabbitMQ 延迟消息队列?
How do I use the RabbitMQ delayed message queue from PHP?
我正在尝试使用来自 PHP 的 RabbitMQ Delayed Message Queue,但我的消息只是消失了。
我正在使用以下代码声明交换:
$this->channel->exchange_declare(
'delay',
'x-delayed-message',
false, /* passive, create if exchange doesn't exist */
true, /* durable, persist through server reboots */
false, /* autodelete */
false, /* internal */
false, /* nowait */
['x-delayed-type' => ['S', 'direct']]);
我正在使用此代码绑定队列:
$this->channel->queue_declare(
$queueName,
false, /* Passive */
true, /* Durable */
false, /* Exclusive */
false /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);
我正在使用此代码发布消息:
$msg = new AMQPMessage(json_encode($msgData), [
'delivery_mode' => 2,
'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);
但是消息没有延迟;它仍然立即交付。我错过了什么?
你需要一个路由键来从交换器发布到有问题的队列。
发布到内置直接交换器的原因是因为此交换器是一种特殊情况,它使用路由键作为目标队列名称。
对于您创建的所有交换器和队列,您需要使用路由键在交换器和队列之间创建绑定。然后使用该路由键而不是目标队列名称发布消息。
我不知道创建绑定的 PHP 代码...但它通常看起来像这样:
channel.bind(exhange_name, queue_name, routing_key)
然后在您发布的消息中:
$this->channel->basic_publish($msg, 'delay', $routing_key);
来自here,
消息创建应该是
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$msg = new AMQPMessage($data,
array(
'delivery_mode' => 2, # make message persistent
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
)
);
答案是为那些需要消息延迟但不想深入细节的人准备的。您只需要做几件事就可以让它工作:
安装 amqp interop 兼容传输,例如 enqueue/amqp-bunny
和 enqueue/amqp-tools
。
composer require enqueue/amqp-bunny enqueue/amqp-tools
创建amqp上下文,添加延迟策略并发送延迟消息:
<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
顺便说一句,这不是唯一可用的策略。有一个基于 RabbitMQ 死信队列 + ttl。它可以以相同的方式使用。
我正在尝试使用来自 PHP 的 RabbitMQ Delayed Message Queue,但我的消息只是消失了。
我正在使用以下代码声明交换:
$this->channel->exchange_declare(
'delay',
'x-delayed-message',
false, /* passive, create if exchange doesn't exist */
true, /* durable, persist through server reboots */
false, /* autodelete */
false, /* internal */
false, /* nowait */
['x-delayed-type' => ['S', 'direct']]);
我正在使用此代码绑定队列:
$this->channel->queue_declare(
$queueName,
false, /* Passive */
true, /* Durable */
false, /* Exclusive */
false /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);
我正在使用此代码发布消息:
$msg = new AMQPMessage(json_encode($msgData), [
'delivery_mode' => 2,
'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);
但是消息没有延迟;它仍然立即交付。我错过了什么?
你需要一个路由键来从交换器发布到有问题的队列。
发布到内置直接交换器的原因是因为此交换器是一种特殊情况,它使用路由键作为目标队列名称。
对于您创建的所有交换器和队列,您需要使用路由键在交换器和队列之间创建绑定。然后使用该路由键而不是目标队列名称发布消息。
我不知道创建绑定的 PHP 代码...但它通常看起来像这样:
channel.bind(exhange_name, queue_name, routing_key)
然后在您发布的消息中:
$this->channel->basic_publish($msg, 'delay', $routing_key);
来自here,
消息创建应该是
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$msg = new AMQPMessage($data,
array(
'delivery_mode' => 2, # make message persistent
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
)
);
答案是为那些需要消息延迟但不想深入细节的人准备的。您只需要做几件事就可以让它工作:
安装 amqp interop 兼容传输,例如 enqueue/amqp-bunny
和 enqueue/amqp-tools
。
composer require enqueue/amqp-bunny enqueue/amqp-tools
创建amqp上下文,添加延迟策略并发送延迟消息:
<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
顺便说一句,这不是唯一可用的策略。有一个基于 RabbitMQ 死信队列 + ttl。它可以以相同的方式使用。