RabbitMQ - 在消息到达到期时间后路由消息

RabbitMQ - routing messages after they reach their expiration time

我最近发现了 RabbitMQ 的功能,它允许您延迟消息,而且效果很好,尽管我找不到任何与我需要的类似的示例:

假设有 3 种类型的消息:A、B 和 C。我们有 2 个 delay_queues 具有 1 小时和 2 小时 'x-message-ttl 值。还有 3 种类型的 destination_queues - 每种用于特定的消息类型。

我想要实现的是,在 delay_queues 之一中的消息达到其 TTL 后,它将根据其类型路由到 destination_queues 之一。像这样:

这甚至可以使用 RabbitMQ 消息属性吗?有任何想法吗?我的代码将消息发送到延迟队列(过期后它们被发送到问候队列):

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

delay_channel = connection.channel()
delay_channel.confirm_delivery()

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 3600000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})

while 1 :
        delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))
        print "Sent to delay queue"

当消息因为达到规定的 TTL 而过期时,可以将它们重定向到 dead-letter exchange,这就是我认为您在消息过期后隐式将消息移动到另一个 queue 的方法.

您可以 select 不同的目的地 queue 通过使用消息的原始 routing-keys,或者最终 "CC" 和 "BCC" 消息 headers.

好的,我已经设法找到了解决方案。不确定它是否是最好的,但它确实有效。

  1. 我创建了两个交易所:DELAY_EXCHANGE 和 ROUTER_EXCHANGE
  2. 我将 DELAY_EXCHANGE 绑定到 delay_queue(我使用的所有 routing_keys)
  3. 延迟队列设置为 x-dead-letter-exchange: ROUTER_EXCHANGEx-message-ttl: 14000
  4. 我将 ROUTER_EXCHANGE 绑定到具有相应 routing_keys 的所有队列 (A、B、C)。

这种方式在发送(推送)消息时我没有指定队列,只是交换 routing_key:

    delay_channel.basic_publish(exchange='delay_exchange',
                  routing_key='helloC',
                  body="test",
                  properties=pika.BasicProperties(delivery_mode=2))

消息被推送到 DELAY_EXCHANGE,后者将其定向到 delay_queue,在那里等待其 TTL。当消息过期时,它被重定向到 ROUTER_EXCHANGE,后者分析其 routing_key 并将其重定向到目标队列之一。厉害了。

您可以使用新的 RabbitMQ 延迟消息交换https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/