Python 异步队列的 CancelledError

Python CancelledError with asyncio queue

我使用 答案中的代码,但是当队列为空时得到 asyncio.exceptions.CancelledError。在实际项目中,我将任务添加到来自消费者的队列中,这就是我使用 while True statement

的原因

我压缩该代码以使调试更容易:

import asyncio
import traceback


async def consumer(queue: asyncio.Queue):
    try:
        while True:
            number = await queue.get()  # here is exception
            queue.task_done()
            print(f'consumed {number}')
    except BaseException:
        traceback.print_exc()


async def main():
    queue = asyncio.Queue()
    for i in range(3):
        await queue.put(i)
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
    await queue.join()
    for c in consumers:
        c.cancel()


asyncio.run(main())

错误:

consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
  File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
    number = await queue.get()
  File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
    await getter
asyncio.exceptions.CancelledError

顺便说一句,queue.get() 的文档说 If queue is empty, wait until an item is available。这个错误的真正原因是什么?也许有更好的解决方案?

原因是因为您取消了任务:

Task.cancel:

Request the Task to be cancelled.

This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

您有几种选择来处理此问题:

1.使用 asyncio.gather

If return_exceptions is True, exceptions are treated the same as successful results, and aggregated in the result list.

await queue.join()

for c in consumers:
    c.cancel()

await asyncio.gather(*consumers, return_exceptions=True)

2。在消费者中捕获异常

async def consumer(q):
    while True:
        num = await q.get()

        try:                
            print(f"Working on: {num}")
        except asyncio.CancelledError:
            print(f"Exiting...")
            break
        finally:
            q.task_done()

3。抑制异常

form contextlib import suppress

async def consumer(q):
    with suppress(asyncio.CancelledError):
        while True:
            num = await q.get()
            print(f"Working on: {num}")
            q.task_done()