使用 aioamqp 一次从多个队列中消费
Consuming from multiple queues at once with aioamqp
是否可以使用带有 aioamqp 的一个通道一次使用多个队列?
免责声明:我在项目问题跟踪器中创建了一个 issue,但我真的很想知道我在做什么是否有意义。
我认为AMQP协议没有这个功能(假设我对协议的理解是正确的)。
如果您希望从队列中消费,您必须在频道上发出 basic.consume
调用。此调用所需的参数是 queue_name
,它是一个 "blocking"(不是阻塞连接而是阻塞通道)调用,其中响应是来自队列的对象。
长话短说:每个消费者在等待队列对象时都必须独占访问通道。
好吧,所以我最初的想法是不正确的。在深入研究 AMQP 之后,我发现它实际上支持一个渠道上的多个消费者。但是,如果需要,它确实允许服务器设置它们的限制。不幸的是,我找不到任何关于 RabbitMQ 特定案例的信息。所以我假设没有这样的限制。总而言之:这是库的问题。
但是解决方法仍然有效:只需为每个消费者创建一个频道。它应该工作得很好。
ammoo 有效:
import asyncio
from ammoo import connect
async def consume(channel, queue_name):
async with channel.consume(queue_name, no_ack=True) as consumer:
async for message in consumer:
print('Message from {}: {}'.format(queue_name, message.body))
if message.body == b'quit':
print('Consumer for queue {} quitting'.format(queue_name))
break
async def main():
async with await connect('amqp://localhost/') as connection:
async with connection.channel() as channel:
await channel.declare_queue('queue_a')
await channel.declare_queue('queue_b')
await asyncio.gather(
consume(channel, 'queue_a'),
consume(channel, 'queue_b')
)
print('Both consumers are done')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
# python3 test.py
Message from queue_a: b'hello queue a'
Message from queue_b: b'hello queue b'
Message from queue_a: b'quit'
Consumer for queue queue_a quitting
Message from queue_b: b'another message for queue b'
Message from queue_b: b'quit'
Consumer for queue queue_b quitting
Both consumers are done
免责声明:我是图书馆的作者
是否可以使用带有 aioamqp 的一个通道一次使用多个队列?
免责声明:我在项目问题跟踪器中创建了一个 issue,但我真的很想知道我在做什么是否有意义。
我认为AMQP协议没有这个功能(假设我对协议的理解是正确的)。
如果您希望从队列中消费,您必须在频道上发出 basic.consume
调用。此调用所需的参数是 queue_name
,它是一个 "blocking"(不是阻塞连接而是阻塞通道)调用,其中响应是来自队列的对象。
长话短说:每个消费者在等待队列对象时都必须独占访问通道。
好吧,所以我最初的想法是不正确的。在深入研究 AMQP 之后,我发现它实际上支持一个渠道上的多个消费者。但是,如果需要,它确实允许服务器设置它们的限制。不幸的是,我找不到任何关于 RabbitMQ 特定案例的信息。所以我假设没有这样的限制。总而言之:这是库的问题。
但是解决方法仍然有效:只需为每个消费者创建一个频道。它应该工作得很好。
ammoo 有效:
import asyncio
from ammoo import connect
async def consume(channel, queue_name):
async with channel.consume(queue_name, no_ack=True) as consumer:
async for message in consumer:
print('Message from {}: {}'.format(queue_name, message.body))
if message.body == b'quit':
print('Consumer for queue {} quitting'.format(queue_name))
break
async def main():
async with await connect('amqp://localhost/') as connection:
async with connection.channel() as channel:
await channel.declare_queue('queue_a')
await channel.declare_queue('queue_b')
await asyncio.gather(
consume(channel, 'queue_a'),
consume(channel, 'queue_b')
)
print('Both consumers are done')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
# python3 test.py
Message from queue_a: b'hello queue a'
Message from queue_b: b'hello queue b'
Message from queue_a: b'quit'
Consumer for queue queue_a quitting
Message from queue_b: b'another message for queue b'
Message from queue_b: b'quit'
Consumer for queue queue_b quitting
Both consumers are done
免责声明:我是图书馆的作者