使用 aioamqp 一次从多个队列中消费

Consuming from multiple queues at once with aioamqp

是否可以使用带有 aioamqp 的一个通道一次使用多个队列?

免责声明:我在项目问题跟踪器中创建了一个 issue,但我真的很想知道我在做什么是否有意义。

我认为AMQP协议没有这个功能(假设我对协议的理解是正确的)。

如果您希望从队列中消费,您必须在频道上发出 basic.consume 调用。此调用所需的参数是 queue_name,它是一个 "blocking"(不是阻塞连接而是阻塞通道)调用,其中响应是来自队列的对象。

长话短说:每个消费者在等待队列对象时都必须独占访问通道。

好吧,所以我最初的想法是不正确的。在深入研究 AMQP 之后,我发现它实际上支持一个渠道上的多个消费者。但是,如果需要,它确实允许服务器设置它们的限制。不幸的是,我找不到任何关于 RabbitMQ 特定案例的信息。所以我假设没有这样的限制。总而言之:这是库的问题。

但是解决方法仍然有效:只需为每个消费者创建一个频道。它应该工作得很好。

ammoo 有效:

import asyncio

from ammoo import connect


async def consume(channel, queue_name):
    async with channel.consume(queue_name, no_ack=True) as consumer:
        async for message in consumer:
            print('Message from {}: {}'.format(queue_name, message.body))
            if message.body == b'quit':
                print('Consumer for queue {} quitting'.format(queue_name))
                break


async def main():
    async with await connect('amqp://localhost/') as connection:
        async with connection.channel() as channel:
            await channel.declare_queue('queue_a')
            await channel.declare_queue('queue_b')

            await asyncio.gather(
                consume(channel, 'queue_a'),
                consume(channel, 'queue_b')
            )
            print('Both consumers are done')



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

# python3 test.py 
Message from queue_a: b'hello queue a'
Message from queue_b: b'hello queue b'
Message from queue_a: b'quit'
Consumer for queue queue_a quitting
Message from queue_b: b'another message for queue b'
Message from queue_b: b'quit'
Consumer for queue queue_b quitting
Both consumers are done

免责声明:我是图书馆的作者