RabbitMQ:如何将失败的消息从一个队列移动到另一个队列?
RabbitMQ: How to move failed message from one queue to another queue?
我有两个队列:
当我 运行 rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin
:
时同样可见
我得到:
+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost | name | node | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| / | high_priority | rabbit@server-rabbitmq | 5 | 0.0 |
| / | high_priority_secondary | rabbit@server-rabbitmq | 0 | 0.0 |
+-------+-------------------------+-------------------------+----------+------------------------------------+
我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin
)如下:
+-------------------------+---------+
| name | type |
+-------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| high_priority | direct |
| high_priority_secondary | direct |
| low_priority | direct |
+-------------------------+---------+
队列和整个相关逻辑在 PHP/Symfony 中实现,但是我想使用原生逻辑(如果可能)通过使用 [=14] =] 或终端中的 rabbitmqctl
命令。
如果 high_priority
上的消息失败,我希望 RabbitMQ 自动将其移至 high_priority_secondary
队列,而无需任何 PHP 参与。这可能吗?我已经开始阅读有关 Dead Letter Exchanges 的内容,但我不确定如何处理。
我已经为辅助队列创建了一个消费者,所以一旦消息移到那里,它就会被处理。
是否可以仅在 CLI 中实现?
仅供参考:SO 上有一些建议的帖子已经涵盖了这个问题,但是 none 的解决方案纯粹是 CLI 解决方案。
high_priority_secondary
队列应绑定到 high_priority_secondary
交换。
high_priority
队列应该绑定到 high_priority
交换并且应该用 x-dead-letter-exchange = high_priority_secondary
.
声明
所以队列应该用死信交换声明。
要对此进行测试,当您从 high_priority
队列中使用消息时,只需拒绝并重新排队的消息。
好的,虽然我不需要修改任何 PHP 代码,但我确实必须在框架级别更改 yaml
配置,因为我希望我的解决方案能够持久化并成为代码库。
在你的app/config/services/rabbitmq.yaml
中:
定义生产者:
high_priority:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority'
type: direct
high_priority_secondary:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority_secondary'
type: direct
message_hospital:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'message_hospital'
type: direct
定义消费者:
high_priority:
connection: default
exchange_options:
name: 'high_priority'
type: direct
queue_options:
name: 'high_priority'
arguments:
x-dead-letter-exchange: ['S', 'high_priority_secondary']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
high_priority_secondary:
connection: default
exchange_options:
name: 'high_priority_secondary'
type: direct
queue_options:
name: 'high_priority_secondary'
arguments:
x-dead-letter-exchange: ['S', 'message_hospital']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
message_hospital:
connection: default
exchange_options:
name: 'message_hospital'
type: direct
queue_options:
name: 'message_hospital'
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
现在队列如下:
多亏了 DLX 属性,消息一旦在之前的队列中失败,就会进入医院队列。
我有两个队列:
当我 运行 rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin
:
我得到:
+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost | name | node | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| / | high_priority | rabbit@server-rabbitmq | 5 | 0.0 |
| / | high_priority_secondary | rabbit@server-rabbitmq | 0 | 0.0 |
+-------+-------------------------+-------------------------+----------+------------------------------------+
我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin
)如下:
+-------------------------+---------+
| name | type |
+-------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| high_priority | direct |
| high_priority_secondary | direct |
| low_priority | direct |
+-------------------------+---------+
队列和整个相关逻辑在 PHP/Symfony 中实现,但是我想使用原生逻辑(如果可能)通过使用 [=14] =] 或终端中的 rabbitmqctl
命令。
如果 high_priority
上的消息失败,我希望 RabbitMQ 自动将其移至 high_priority_secondary
队列,而无需任何 PHP 参与。这可能吗?我已经开始阅读有关 Dead Letter Exchanges 的内容,但我不确定如何处理。
我已经为辅助队列创建了一个消费者,所以一旦消息移到那里,它就会被处理。
是否可以仅在 CLI 中实现?
仅供参考:SO 上有一些建议的帖子已经涵盖了这个问题,但是 none 的解决方案纯粹是 CLI 解决方案。
high_priority_secondary
队列应绑定到 high_priority_secondary
交换。
high_priority
队列应该绑定到 high_priority
交换并且应该用 x-dead-letter-exchange = high_priority_secondary
.
所以队列应该用死信交换声明。
要对此进行测试,当您从 high_priority
队列中使用消息时,只需拒绝并重新排队的消息。
好的,虽然我不需要修改任何 PHP 代码,但我确实必须在框架级别更改 yaml
配置,因为我希望我的解决方案能够持久化并成为代码库。
在你的app/config/services/rabbitmq.yaml
中:
定义生产者:
high_priority:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority'
type: direct
high_priority_secondary:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority_secondary'
type: direct
message_hospital:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'message_hospital'
type: direct
定义消费者:
high_priority:
connection: default
exchange_options:
name: 'high_priority'
type: direct
queue_options:
name: 'high_priority'
arguments:
x-dead-letter-exchange: ['S', 'high_priority_secondary']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
high_priority_secondary:
connection: default
exchange_options:
name: 'high_priority_secondary'
type: direct
queue_options:
name: 'high_priority_secondary'
arguments:
x-dead-letter-exchange: ['S', 'message_hospital']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
message_hospital:
connection: default
exchange_options:
name: 'message_hospital'
type: direct
queue_options:
name: 'message_hospital'
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
现在队列如下:
多亏了 DLX 属性,消息一旦在之前的队列中失败,就会进入医院队列。