python asyncio - 取消 `to_thread` 任务不会停止线程
python asyncio - cancelling a `to_thread` task won't stop the thread
使用以下代码片段,我不明白为什么 infiniteTask
没有被取消(它一直在发送垃圾邮件“我还站着”)
在调试模式下,我可以看到存储在 unfinished
中的 Task
确实被标记为已取消,但很明显线程没有被取消/终止。
为什么取消包装任务时线程没有被杀死?
我应该怎么做才能停止线程?
import time
import asyncio
def quickTask():
time.sleep(1)
def infiniteTask():
while True:
time.sleep(1)
print("I'm still standing")
async def main():
finished, unfinished = await asyncio.wait({
asyncio.create_task(asyncio.to_thread(quickTask)),
asyncio.create_task(asyncio.to_thread(infiniteTask))
},
return_when = "FIRST_COMPLETED"
)
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
print(" finished : " + str(len(finished))) # print '1'
print("unfinished : " + str(len(unfinished))) # print '1'
asyncio.run(main())
原因
如果我们检查asyncio.to_thread()
的定义:
# python310/Lib/asyncio/threads.py
# ...
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
它实际上是loop.run_in_executor
的包装器。
如果我们接着研究 asyncio 的测试句柄 run_in_executor
:
# python310/Lib/test/test_asyncio/threads.py
# ...
class EventLoopTestsMixin:
# ...
def test_run_in_executor_cancel(self):
called = False
def patched_call_soon(*args):
nonlocal called
called = True
def run():
time.sleep(0.05)
f2 = self.loop.run_in_executor(None, run)
f2.cancel()
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
self.loop.call_soon = patched_call_soon
self.loop.call_soon_threadsafe = patched_call_soon
time.sleep(0.4)
self.assertFalse(called)
您可以看到它将等待 self.loop.shutdown_default_executor()
。
现在让我们看看它的样子。
# event.pyi
# ...
class BaseEventLoop(events.AbstractEventLoop):
# ...
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)
在这里,我们可以看到它创建了另一个线程来等待 _do_shutdown
,然后 运行s self._default_executor.shutdown
带有 wait=True
参数。
那么关机执行的地方:
# Python310/Lib/concurrent/futures/thread.py
# ...
class ThreadPoolExecutor(_base.Executor):
# ...
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
当 wait=True
时,它决定等待所有线程正常停止。
从所有这些我们看不到任何实际取消线程的努力。
Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:
- If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.
- If cancellable=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises.
所以,从这些我们可以知道没有办法终止线程中的 infinite-loop 运行ning。
解决方法
因为现在我们知道我们必须更加小心地设计线程中的内容 运行,我们需要一种方法来向线程发出我们想要停止的信号。
对于这种情况,我们可以利用 Event
。
import time
import asyncio
def blocking_func(event: asyncio.Event):
while not event.is_set():
time.sleep(1)
print("I'm still standing")
async def main():
event = asyncio.Event()
asyncio.create_task(asyncio.to_thread(blocking_func, event))
await asyncio.sleep(5)
# now lets stop
event.set()
asyncio.run(main())
通过检查每个循环的事件,我们可以看到程序正常终止。
I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing
Process finished with exit code 0
使用以下代码片段,我不明白为什么 infiniteTask
没有被取消(它一直在发送垃圾邮件“我还站着”)
在调试模式下,我可以看到存储在 unfinished
中的 Task
确实被标记为已取消,但很明显线程没有被取消/终止。
为什么取消包装任务时线程没有被杀死? 我应该怎么做才能停止线程?
import time
import asyncio
def quickTask():
time.sleep(1)
def infiniteTask():
while True:
time.sleep(1)
print("I'm still standing")
async def main():
finished, unfinished = await asyncio.wait({
asyncio.create_task(asyncio.to_thread(quickTask)),
asyncio.create_task(asyncio.to_thread(infiniteTask))
},
return_when = "FIRST_COMPLETED"
)
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
print(" finished : " + str(len(finished))) # print '1'
print("unfinished : " + str(len(unfinished))) # print '1'
asyncio.run(main())
原因
如果我们检查asyncio.to_thread()
的定义:
# python310/Lib/asyncio/threads.py
# ...
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
它实际上是loop.run_in_executor
的包装器。
如果我们接着研究 asyncio 的测试句柄 run_in_executor
:
# python310/Lib/test/test_asyncio/threads.py
# ...
class EventLoopTestsMixin:
# ...
def test_run_in_executor_cancel(self):
called = False
def patched_call_soon(*args):
nonlocal called
called = True
def run():
time.sleep(0.05)
f2 = self.loop.run_in_executor(None, run)
f2.cancel()
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
self.loop.call_soon = patched_call_soon
self.loop.call_soon_threadsafe = patched_call_soon
time.sleep(0.4)
self.assertFalse(called)
您可以看到它将等待 self.loop.shutdown_default_executor()
。
现在让我们看看它的样子。
# event.pyi
# ...
class BaseEventLoop(events.AbstractEventLoop):
# ...
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)
在这里,我们可以看到它创建了另一个线程来等待 _do_shutdown
,然后 运行s self._default_executor.shutdown
带有 wait=True
参数。
那么关机执行的地方:
# Python310/Lib/concurrent/futures/thread.py
# ...
class ThreadPoolExecutor(_base.Executor):
# ...
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
当 wait=True
时,它决定等待所有线程正常停止。
从所有这些我们看不到任何实际取消线程的努力。
Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:
- If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.
- If cancellable=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises.
所以,从这些我们可以知道没有办法终止线程中的 infinite-loop 运行ning。
解决方法
因为现在我们知道我们必须更加小心地设计线程中的内容 运行,我们需要一种方法来向线程发出我们想要停止的信号。
对于这种情况,我们可以利用 Event
。
import time
import asyncio
def blocking_func(event: asyncio.Event):
while not event.is_set():
time.sleep(1)
print("I'm still standing")
async def main():
event = asyncio.Event()
asyncio.create_task(asyncio.to_thread(blocking_func, event))
await asyncio.sleep(5)
# now lets stop
event.set()
asyncio.run(main())
通过检查每个循环的事件,我们可以看到程序正常终止。
I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing
Process finished with exit code 0