RabbitMQ - 在消息到达到期时间后路由消息
RabbitMQ - routing messages after they reach their expiration time
我最近发现了 RabbitMQ 的功能,它允许您延迟消息,而且效果很好,尽管我找不到任何与我需要的类似的示例:
假设有 3 种类型的消息:A、B 和 C。我们有 2 个 delay_queues 具有 1 小时和 2 小时 'x-message-ttl
值。还有 3 种类型的 destination_queues - 每种用于特定的消息类型。
我想要实现的是,在 delay_queues 之一中的消息达到其 TTL 后,它将根据其类型路由到 destination_queues 之一。像这样:
这甚至可以使用 RabbitMQ 消息属性吗?有任何想法吗?我的代码将消息发送到延迟队列(过期后它们被发送到问候队列):
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)
channel.queue_bind(exchange='amq.direct',
queue='hello')
delay_channel = connection.channel()
delay_channel.confirm_delivery()
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 3600000,
'x-dead-letter-exchange' : 'amq.direct',
'x-dead-letter-routing-key' : 'hello'
})
while 1 :
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
print "Sent to delay queue"
当消息因为达到规定的 TTL 而过期时,可以将它们重定向到 dead-letter exchange,这就是我认为您在消息过期后隐式将消息移动到另一个 queue 的方法.
您可以 select 不同的目的地 queue 通过使用消息的原始 routing-keys,或者最终 "CC" 和 "BCC" 消息 headers.
好的,我已经设法找到了解决方案。不确定它是否是最好的,但它确实有效。
- 我创建了两个交易所:DELAY_EXCHANGE 和 ROUTER_EXCHANGE
- 我将 DELAY_EXCHANGE 绑定到 delay_queue(我使用的所有 routing_keys)
- 延迟队列设置为
x-dead-letter-exchange: ROUTER_EXCHANGE
和 x-message-ttl: 14000
- 我将 ROUTER_EXCHANGE 绑定到具有相应 routing_keys 的所有队列 (A、B、C)。
这种方式在发送(推送)消息时我没有指定队列,只是交换 routing_key:
delay_channel.basic_publish(exchange='delay_exchange',
routing_key='helloC',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
消息被推送到 DELAY_EXCHANGE,后者将其定向到 delay_queue,在那里等待其 TTL。当消息过期时,它被重定向到 ROUTER_EXCHANGE,后者分析其 routing_key 并将其重定向到目标队列之一。厉害了。
您可以使用新的 RabbitMQ 延迟消息交换https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/
我最近发现了 RabbitMQ 的功能,它允许您延迟消息,而且效果很好,尽管我找不到任何与我需要的类似的示例:
假设有 3 种类型的消息:A、B 和 C。我们有 2 个 delay_queues 具有 1 小时和 2 小时 'x-message-ttl
值。还有 3 种类型的 destination_queues - 每种用于特定的消息类型。
我想要实现的是,在 delay_queues 之一中的消息达到其 TTL 后,它将根据其类型路由到 destination_queues 之一。像这样:
这甚至可以使用 RabbitMQ 消息属性吗?有任何想法吗?我的代码将消息发送到延迟队列(过期后它们被发送到问候队列):
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)
channel.queue_bind(exchange='amq.direct',
queue='hello')
delay_channel = connection.channel()
delay_channel.confirm_delivery()
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 3600000,
'x-dead-letter-exchange' : 'amq.direct',
'x-dead-letter-routing-key' : 'hello'
})
while 1 :
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
print "Sent to delay queue"
当消息因为达到规定的 TTL 而过期时,可以将它们重定向到 dead-letter exchange,这就是我认为您在消息过期后隐式将消息移动到另一个 queue 的方法.
您可以 select 不同的目的地 queue 通过使用消息的原始 routing-keys,或者最终 "CC" 和 "BCC" 消息 headers.
好的,我已经设法找到了解决方案。不确定它是否是最好的,但它确实有效。
- 我创建了两个交易所:DELAY_EXCHANGE 和 ROUTER_EXCHANGE
- 我将 DELAY_EXCHANGE 绑定到 delay_queue(我使用的所有 routing_keys)
- 延迟队列设置为
x-dead-letter-exchange: ROUTER_EXCHANGE
和x-message-ttl: 14000
- 我将 ROUTER_EXCHANGE 绑定到具有相应 routing_keys 的所有队列 (A、B、C)。
这种方式在发送(推送)消息时我没有指定队列,只是交换 routing_key:
delay_channel.basic_publish(exchange='delay_exchange',
routing_key='helloC',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
消息被推送到 DELAY_EXCHANGE,后者将其定向到 delay_queue,在那里等待其 TTL。当消息过期时,它被重定向到 ROUTER_EXCHANGE,后者分析其 routing_key 并将其重定向到目标队列之一。厉害了。
您可以使用新的 RabbitMQ 延迟消息交换https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/