python asyncio - 取消 `to_thread` 任务不会停止线程

python asyncio - cancelling a `to_thread` task won't stop the thread

使用以下代码片段,我不明白为什么 infiniteTask 没有被取消(它一直在发送垃圾邮件“我还站着”)

在调试模式下,我可以看到存储在 unfinished 中的 Task 确实被标记为已取消,但很明显线程没有被取消/终止。

为什么取消包装任务时线程没有被杀死? 我应该怎么做才能停止线程?

import time
import asyncio

def quickTask():
    time.sleep(1)

def infiniteTask():
    while True:
        time.sleep(1)
        print("I'm still standing")

async def main():
    finished, unfinished = await asyncio.wait({
            asyncio.create_task(asyncio.to_thread(quickTask)),
            asyncio.create_task(asyncio.to_thread(infiniteTask))
        },
        return_when = "FIRST_COMPLETED"
    )

    for task in unfinished:
        task.cancel()
    await asyncio.wait(unfinished)

    print("  finished : " + str(len(finished))) # print '1'
    print("unfinished : " + str(len(unfinished))) # print '1' 

    
asyncio.run(main())

原因

如果我们检查asyncio.to_thread()的定义:

# python310/Lib/asyncio/threads.py
# ...

async def to_thread(func, /, *args, **kwargs):
    """Asynchronously run function *func* in a separate thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propagated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a coroutine that can be awaited to get the eventual result of *func*.
    """
    loop = events.get_running_loop()
    ctx = contextvars.copy_context()
    func_call = functools.partial(ctx.run, func, *args, **kwargs)
    return await loop.run_in_executor(None, func_call)

它实际上是loop.run_in_executor的包装器。

如果我们接着研究 asyncio 的测试句柄 run_in_executor:

# python310/Lib/test/test_asyncio/threads.py
# ...

class EventLoopTestsMixin:
    # ...

    def test_run_in_executor_cancel(self):
        called = False

        def patched_call_soon(*args):
            nonlocal called
            called = True

        def run():
            time.sleep(0.05)

        f2 = self.loop.run_in_executor(None, run)
        f2.cancel()
        self.loop.run_until_complete(
                self.loop.shutdown_default_executor())
        self.loop.close()
        self.loop.call_soon = patched_call_soon
        self.loop.call_soon_threadsafe = patched_call_soon
        time.sleep(0.4)
        self.assertFalse(called)

您可以看到它将等待 self.loop.shutdown_default_executor()

现在让我们看看它的样子。

# event.pyi
# ...

class BaseEventLoop(events.AbstractEventLoop):
    # ...

    async def shutdown_default_executor(self):
        """Schedule the shutdown of the default executor."""
        self._executor_shutdown_called = True
        if self._default_executor is None:
            return
        future = self.create_future()
        thread = threading.Thread(target=self._do_shutdown, args=(future,))
        thread.start()
        try:
            await future
        finally:
            thread.join()

    def _do_shutdown(self, future):
        try:
            self._default_executor.shutdown(wait=True)
            self.call_soon_threadsafe(future.set_result, None)
        except Exception as ex:
            self.call_soon_threadsafe(future.set_exception, ex)

在这里,我们可以看到它创建了另一个线程来等待 _do_shutdown,然后 运行s self._default_executor.shutdown 带有 wait=True 参数。

那么关机执行的地方:

# Python310/Lib/concurrent/futures/thread.py
# ...

class ThreadPoolExecutor(_base.Executor):
    # ...

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True
            if cancel_futures:
                # Drain all work items from the queue, and then cancel their
                # associated futures.
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except queue.Empty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # Send a wake-up to prevent threads calling
            # _work_queue.get(block=True) from permanently blocking.
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()

wait=True 时,它决定等待所有线程正常停止。

从所有这些我们看不到任何实际取消线程的努力。

引用自Trio Documentation

Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:

  • If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.
  • If cancellable=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises.

所以,从这些我们可以知道没有办法终止线程中的 infinite-loop 运行ning。


解决方法

因为现在我们知道我们必须更加小心地设计线程中的内容 运行,我们需要一种方法来向线程发出我们想要停止的信号。

对于这种情况,我们可以利用 Event

import time
import asyncio


def blocking_func(event: asyncio.Event):
    while not event.is_set():
        time.sleep(1)
        print("I'm still standing")


async def main():
    event = asyncio.Event()
    asyncio.create_task(asyncio.to_thread(blocking_func, event))

    await asyncio.sleep(5)
    # now lets stop
    event.set()

asyncio.run(main())

通过检查每个循环的事件,我们可以看到程序正常终止。

I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing

Process finished with exit code 0