RabbitMQ:如何将失败的消息从一个队列移动到另一个队列?

RabbitMQ: How to move failed message from one queue to another queue?

我有两个队列:

当我 运行 rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin:

时同样可见

我得到:

+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost |          name           |          node           | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| /     | high_priority           | rabbit@server-rabbitmq  | 5        | 0.0                                |
| /     | high_priority_secondary | rabbit@server-rabbitmq  | 0        | 0.0                                |
+-------+-------------------------+-------------------------+----------+------------------------------------+

我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin)如下:

+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
|                         | direct  |
| amq.direct              | direct  |
| amq.fanout              | fanout  |
| amq.headers             | headers |
| amq.match               | headers |
| amq.rabbitmq.trace      | topic   |
| amq.topic               | topic   |
| high_priority           | direct  |
| high_priority_secondary | direct  |
| low_priority            | direct  |
+-------------------------+---------+

队列和整个相关逻辑在 PHP/Symfony 中实现,但是我想使用原生逻辑(如果可能)通过使用 [=14] =] 或终端中的 rabbitmqctl 命令。

如果 high_priority 上的消息失败,我希望 RabbitMQ 自动将其移至 high_priority_secondary 队列,而无需任何 PHP 参与。这可能吗?我已经开始阅读有关 Dead Letter Exchanges 的内容,但我不确定如何处理。

我已经为辅助队列创建了一个消费者,所以一旦消息移到那里,它就会被处理。

是否可以仅在 CLI 中实现?

仅供参考:SO 上有一些建议的帖子已经涵盖了这个问题,但是 none 的解决方案纯粹是 CLI 解决方案。

high_priority_secondary 队列应绑定到 high_priority_secondary 交换。 high_priority 队列应该绑定到 high_priority 交换并且应该用 x-dead-letter-exchange = high_priority_secondary.

声明

所以队列应该用死信交换声明。

要对此进行测试,当您从 high_priority 队列中使用消息时,只需拒绝并重新排队的消息。

好的,虽然我不需要修改任何 PHP 代码,但我确实必须在框架级别更改 yaml 配置,因为我希望我的解决方案能够持久化并成为代码库。

在你的app/config/services/rabbitmq.yaml中:

定义生产者:

high_priority:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'high_priority'
        type: direct
high_priority_secondary:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'high_priority_secondary'
        type: direct
message_hospital:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'message_hospital'
        type: direct

定义消费者:

high_priority:
    connection: default
    exchange_options:
        name: 'high_priority'
        type: direct
    queue_options:
        name: 'high_priority'
        arguments:
            x-dead-letter-exchange: ['S', 'high_priority_secondary']
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer
high_priority_secondary:
    connection: default
    exchange_options:
        name: 'high_priority_secondary'
        type: direct
    queue_options:
        name: 'high_priority_secondary'
        arguments:
            x-dead-letter-exchange: ['S', 'message_hospital']
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer
message_hospital:
    connection: default
    exchange_options:
        name: 'message_hospital'
        type: direct
    queue_options:
        name: 'message_hospital'
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer

现在队列如下:

多亏了 DLX 属性,消息一旦在之前的队列中失败,就会进入医院队列。