令人困惑的异步任务取消行为

Confusing asyncio task cancellation behavior

我对以下 asyncio 代码的行为感到困惑:

import time
import asyncio
from threading import Thread
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

event_loop = None
q = None

# queue items processing
async def _main():
    global event_loop, q
    q = asyncio.Queue(maxsize=5)
    event_loop = asyncio.get_running_loop()
    try:
        while True:
            try:
                new_data = await asyncio.wait_for(q.get(), timeout=1)
                logger.info(new_data)
                q.task_done()
            except asyncio.TimeoutError:
                logger.warning(f'timeout - main cancelled? {asyncio.current_task().cancelled()}')
    except asyncio.CancelledError:
        logger.warning(f'cancelled')
        raise

def _event_loop_thread():
    try:
        asyncio.run(_main(), debug=True)
    except asyncio.CancelledError:
        logger.warning('main was cancelled')

thread = Thread(target=_event_loop_thread)
thread.start()

# wait for the event loop to start
while not event_loop:
    time.sleep(0.1)

async def _push(a):
    try:
        try:
            await q.put(a)
            await asyncio.sleep(0.1)
        except asyncio.QueueFull:
            logger.warning('q full')
    except asyncio.CancelledError:
        logger.warning('push cancelled')
        raise

# push some stuff to the queue
for i in range(10):
    future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)

pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
    logger.info(f'killing task {task.get_coro()}')
    event_loop.call_soon_threadsafe(task.cancel)

logger.info('finished')

产生以下输出:

INFO:__main__:killing task <coroutine object _main at 0x7f7ff05d6a40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd17140>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd0fbc0>
INFO:__main__:killing task <coroutine object Queue.get at 0x7f7fefd7dd40>
INFO:__main__:killing task <coroutine object _push at 0x7f7fefd170c0>
INFO:__main__:finished
INFO:__main__:processed 0
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
WARNING:__main__:push cancelled
INFO:__main__:processed 1
INFO:__main__:processed 2
INFO:__main__:processed 3
INFO:__main__:processed 4
INFO:__main__:processed 5
INFO:__main__:processed 6
INFO:__main__:processed 7
INFO:__main__:processed 8
INFO:__main__:processed 9
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False
WARNING:__main__:timeout - main cancelled? False

为什么 _main() coro 永远不会被取消?我查看了 asyncio 文档,但没有发现任何暗示可能发生的事情。

此外,如果替换行:

new_data = await asyncio.wait_for(q.get(), timeout=1)

有:

new_data = await q.get()

一切如预期。 _main() 和所有其他任务被正确取消。所以这似乎是 async.wait_for().

的问题

我想在这里做的是有一个生产者/消费者模型,其中消费者是 asyncio 事件循环中的 _main() 任务(运行 在一个单独的线程中),主线程是生产者(使用 _push())。

谢谢

不幸的是,您偶然发现了 asyncio 包中的一个突出错误:https://bugs.python.org/issue42130。如您所见,asyncio.wait_for 在某些情况下可以抑制 CancelledError。当取消发生时传递给 wait_for 的等待对象实际上已经完成时,就会发生这种情况; wait_for 然后 returns 等待的结果而不传播取消。 (我也是通过艰难的方式了解到这一点的。)

目前(据我所知)唯一可用的修复方法是避免在任何可以取消的协程中使用 wait_for。也许在您的情况下,您可以简单地 await q.get() 而不必担心超时的可能性。

我想顺便指出,您的程序具有严重的不确定性。我的意思是您没有在两个线程之间同步 activity - 这会产生一些 st运行ge 后果。例如,您是否注意到,您基于 _push 协程创建了 10 个任务,但您只取消了其中的 3 个?发生这种情况是因为您向第二个线程创建了 10 个任务:

# push some stuff to the queue
for i in range(10):
    future = asyncio.run_coroutine_threadsafe(_push(f'processed {i}'), event_loop)

但没有等待任何返回的期货,您立即开始取消任务:

pending_tasks = asyncio.all_tasks(loop=event_loop)
# cancel each pending task
for task in pending_tasks:
    logger.info(f'killing task {task.get_coro()}')
    event_loop.call_soon_threadsafe(task.cancel)

显然第二个线程还没有完成所有任务的创建,所以你的任务取消逻辑是偶然的。

在两个线程之间分配 CPU 时间片是一个 OS 函数,如果您希望不同线程中的事情以特定顺序发生,您必须编写显式逻辑。当我 运行 你在我的机器上的确切代码时 (python3.10, Windows 10) 我得到的行为与你报告的完全不同。

事实证明,这不是真正的问题,但很难对每次都执行相同操作的程序进行故障排除。