使用Kombu ConsumerMixin,如何声明多个绑定?
Using Kombu ConsumerMixin, how to declare multiple bindings?
我有一个名为 experiment
的 RabbitMQ 主题交换。我正在构建一个消费者,我想在其中接收路由键以 "foo" 开头的所有消息以及路由键以 "bar".
开头的所有消息
根据 RabbitMQ 文档,并根据我自己在管理方面的实验 UI,应该可以有一个交换器、一个队列和两个绑定(foo.#
和 bar.#
) 连接它们。
我不知道如何使用 Kombu 的 ConsumerMixin 来表达这一点。我觉得我应该可以做到:
q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])
...但它根本不喜欢那样。我也试过:
q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')
...但每次我尝试我都会得到:
kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel
...我想这很有道理。但是,我在 mixin 的界面中看不到一个地方,一旦它们绑定到通道,我就可以轻松地连接到队列。这是基本(工作)代码:
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
class Worker(ConsumerMixin):
exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.q], callbacks=[self.on_task])]
def on_task(self, body, message):
print body
message.ack()
if __name__ == '__main__':
with Connection('amqp://guest:guest@localhost:5672//') as conn:
worker = Worker(conn)
worker.run()
...有效,但只给我 foo
消息。除了为我感兴趣的每个路由键创建一个新队列并将它们全部传递给消费者之外,是否有一种干净的方法可以做到这一点?
经过一番挖掘,我找到了一种与我的第一个想法非常接近的方法。不是将 routing_key
字符串传递给队列,而是传递 bindings
列表。列表中的每个元素都是指定交换和路由密钥的 binding
对象的一个实例。
一例抵千言:
from kombu import Exchange, Queue, binding
exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
binding(exchange, routing_key='foo.#'),
binding(exchange, routing_key='bar.#')
], exclusive=True)
而且效果很好!
这里是对答案的小幅调整。当 bindings
参数用于定义绑定时,exchange
参数将被忽略。
调整后的例子:
from kombu import Exchange, Queue, binding
exchange = Exchange('experiment', type='topic')
q = Queue(bindings=[
binding(exchange, routing_key='foo.#'),
binding(exchange, routing_key='bar.#'),
])
exchange
参数在队列初始化期间被丢弃:
if self.bindings:
self.exchange = None
我有一个名为 experiment
的 RabbitMQ 主题交换。我正在构建一个消费者,我想在其中接收路由键以 "foo" 开头的所有消息以及路由键以 "bar".
根据 RabbitMQ 文档,并根据我自己在管理方面的实验 UI,应该可以有一个交换器、一个队列和两个绑定(foo.#
和 bar.#
) 连接它们。
我不知道如何使用 Kombu 的 ConsumerMixin 来表达这一点。我觉得我应该可以做到:
q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])
...但它根本不喜欢那样。我也试过:
q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')
...但每次我尝试我都会得到:
kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel
...我想这很有道理。但是,我在 mixin 的界面中看不到一个地方,一旦它们绑定到通道,我就可以轻松地连接到队列。这是基本(工作)代码:
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
class Worker(ConsumerMixin):
exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.q], callbacks=[self.on_task])]
def on_task(self, body, message):
print body
message.ack()
if __name__ == '__main__':
with Connection('amqp://guest:guest@localhost:5672//') as conn:
worker = Worker(conn)
worker.run()
...有效,但只给我 foo
消息。除了为我感兴趣的每个路由键创建一个新队列并将它们全部传递给消费者之外,是否有一种干净的方法可以做到这一点?
经过一番挖掘,我找到了一种与我的第一个想法非常接近的方法。不是将 routing_key
字符串传递给队列,而是传递 bindings
列表。列表中的每个元素都是指定交换和路由密钥的 binding
对象的一个实例。
一例抵千言:
from kombu import Exchange, Queue, binding
exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
binding(exchange, routing_key='foo.#'),
binding(exchange, routing_key='bar.#')
], exclusive=True)
而且效果很好!
这里是bindings
参数用于定义绑定时,exchange
参数将被忽略。
调整后的例子:
from kombu import Exchange, Queue, binding
exchange = Exchange('experiment', type='topic')
q = Queue(bindings=[
binding(exchange, routing_key='foo.#'),
binding(exchange, routing_key='bar.#'),
])
exchange
参数在队列初始化期间被丢弃:
if self.bindings:
self.exchange = None