aio_pika。消息从队列中随机丢失
aio_pika. Messages was random lost from queue
我有用于消息消费的 asyncronus 生成器。
import asyncio
queue = asyncio.Queue()
async def consume_gen(
self,
consume_from,
prefetch_count,
priority=None
):
async with self.channel_pool.acquire() as channel:
await channel.set_qos(prefetch_count=5)
self.amqp_queue = await channel.declare_queue(
'queue_name_for_consuming',
durable=True,
auto_delete=False
)
await self.amqp_queue.consume(
self.get_message, no_ack=False
)
await asyncio.sleep(0)
while True:
try:
message = self.queue.get_nowait()
yield message
except asyncio.queues.QueueEmpty:
await asyncio.sleep(1)
yield None
except GeneratorExit:
return
else:
return
这是我的生成器的回调函数,它响应获取队列消息并将其放入内部异步队列。
async def get_message(self, message):
await self.queue.put(message)
我的问题是有些消息随机从队列中消失。他们不会确认或卡在任何消费者中,因为我有所有步骤的日志。另外,我知道这些消息进入队列的内容,应该从中使用。如果能帮助解决我的问题,我们将不胜感激
解决。问题更容易。我的记录器应用程序没有足够的日志级别)
我有用于消息消费的 asyncronus 生成器。
import asyncio
queue = asyncio.Queue()
async def consume_gen(
self,
consume_from,
prefetch_count,
priority=None
):
async with self.channel_pool.acquire() as channel:
await channel.set_qos(prefetch_count=5)
self.amqp_queue = await channel.declare_queue(
'queue_name_for_consuming',
durable=True,
auto_delete=False
)
await self.amqp_queue.consume(
self.get_message, no_ack=False
)
await asyncio.sleep(0)
while True:
try:
message = self.queue.get_nowait()
yield message
except asyncio.queues.QueueEmpty:
await asyncio.sleep(1)
yield None
except GeneratorExit:
return
else:
return
这是我的生成器的回调函数,它响应获取队列消息并将其放入内部异步队列。
async def get_message(self, message):
await self.queue.put(message)
我的问题是有些消息随机从队列中消失。他们不会确认或卡在任何消费者中,因为我有所有步骤的日志。另外,我知道这些消息进入队列的内容,应该从中使用。如果能帮助解决我的问题,我们将不胜感激
解决。问题更容易。我的记录器应用程序没有足够的日志级别)