如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止 long-运行 计算(CPU 绑定任务)?
How to terminate long-running computation (CPU bound task) in Python using asyncio and concurrent.futures.ProcessPoolExecutor?
Similar Question (but answer does not work for me):
与上面链接的问题和提供的解决方案不同,在我的例子中,计算本身相当长(CPU 绑定)并且不能 运行 在循环中检查是否发生了某些事件.
以下代码的简化版本:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = asyncio.gather(*self._tasks)
# Send the reasoner tasks to main monitor task
asyncio.gather(self.sample_main_loop(_reasoner_tasks))
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self._loop.close()
async def sample_main_loop(self, reasoner_tasks):
"""This is the main monitor task"""
await asyncio.wait_for(reasoner_tasks, None)
for task in self._long_running_tasks:
try:
await asyncio.wait_for(task, 10)
except asyncio.TimeoutError:
print("Oops. Some long operation timed out.")
task.cancel() # Doesn't cancel and has no effect
task.set_result(None) # Doesn't seem to have an effect
self._lmz_executor.shutdown()
self._loop.stop()
print('And now I am done. Yay!')
async def bot_reasoning_loop(self, bot):
import math
_exec_count = 0
_sleepy_time = 15
_max_runs = math.floor(self._max_execution_time / _sleepy_time)
self._long_running_tasks.append(
self._loop.run_in_executor(
self._lmz_executor, really_long_process, _sleepy_time))
while time.monotonic() < self._max_execution_time:
print("Bot#{}: thinking for {}s. Run {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs))
await asyncio.sleep(_sleepy_time)
_exec_count += 1
print("Bot#{} Finished Thinking".format(bot))
def really_long_process(sleepy_time):
print("I am a really long computation.....")
_large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(_large_val))
if __name__ == "__main__":
sim = Simulator()
sim.initialise()
sim.run()
这个想法是有一个主模拟循环 运行 并监控三个机器人线程。然后,这些机器人线程中的每一个都会执行一些推理,但也会使用 ProcessPoolExecutor
启动一个非常长的后台进程,这可能最终会 运行 延长它们自己的 threshold/max 推理执行时间。
正如您在上面的代码中所看到的,我试图在发生超时时 .cancel()
这些任务。虽然这并没有真正取消实际计算,但它一直在后台发生,并且 asyncio
循环直到所有长 运行ning 计算完成后才终止。
如何在方法中终止如此长的 运行ning CPU 绑定计算?
Other similar SO questions, but not necessarily related or helpful:
- asyncio: Is it possible to cancel a future been run by an Executor?
- How to terminate a single async task in multiprocessing if that single async task exceeds a threshold time in Python
- Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?
How do I terminate such long running CPU-bound computations within a method?
您尝试的方法无效,因为任务开始执行后 return 由 ProcessPoolExecutor
are not cancellable. Although asyncio's run_in_executor
tries to propagate the cancellation, it is simply ignored 由 Future.cancel
编辑。
这没有根本原因。与线程不同,进程可以安全终止,因此 ProcessPoolExecutor.submit
到 return 一个 cancel
终止相应进程的未来是完全可能的。 Asyncio 协程具有明确定义的取消语义,可以自动使用它。不幸的是,ProcessPoolExecutor.submit
return 是一个常规的 concurrent.futures.Future
,它假定底层执行者的最小公分母,并将 运行 未来视为不可触及的。
因此,要取消在子进程中执行的任务,必须完全绕过 ProcessPoolExecutor
并管理自己的进程。挑战在于如何在不重新实现 multiprocessing
的一半的情况下做到这一点。标准库提供的一个选项是(ab)使用 multiprocessing.Pool
用于此目的,因为它支持工作进程的可靠关闭。 CancellablePool
可以按如下方式工作:
- 不是生成固定数量的进程,而是生成固定数量的 1-worker 池。
- 从 asyncio 协程中将任务分配给池。如果协程在等待另一个进程中的任务完成时被取消,terminate 单进程池并创建一个新的。
- 因为一切都是从单个异步线程协调的,所以不必担心竞争条件,例如意外终止已经开始执行另一个任务的进程。 (如果要在
ProcessPoolExecutor
中支持取消,则需要避免这种情况。)
这是该想法的示例实现:
import asyncio
import multiprocessing
class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()
def _new_pool(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()
def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()
显示取消的简约测试用例:
def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))
async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()
tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()
asyncio.get_event_loop().run_until_complete(main())
请注意 CPU 使用率如何从未超过 3 个核心,以及它如何在接近测试结束时开始下降,表明进程正在按预期终止。
要将其应用于问题中的代码,请将 self._lmz_executor
设为 CancellablePool
的实例并将 self._loop.run_in_executor(...)
更改为 self._loop.create_task(self._lmz_executor.apply(...))
。
Similar Question (but answer does not work for me):
与上面链接的问题和提供的解决方案不同,在我的例子中,计算本身相当长(CPU 绑定)并且不能 运行 在循环中检查是否发生了某些事件.
以下代码的简化版本:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = asyncio.gather(*self._tasks)
# Send the reasoner tasks to main monitor task
asyncio.gather(self.sample_main_loop(_reasoner_tasks))
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self._loop.close()
async def sample_main_loop(self, reasoner_tasks):
"""This is the main monitor task"""
await asyncio.wait_for(reasoner_tasks, None)
for task in self._long_running_tasks:
try:
await asyncio.wait_for(task, 10)
except asyncio.TimeoutError:
print("Oops. Some long operation timed out.")
task.cancel() # Doesn't cancel and has no effect
task.set_result(None) # Doesn't seem to have an effect
self._lmz_executor.shutdown()
self._loop.stop()
print('And now I am done. Yay!')
async def bot_reasoning_loop(self, bot):
import math
_exec_count = 0
_sleepy_time = 15
_max_runs = math.floor(self._max_execution_time / _sleepy_time)
self._long_running_tasks.append(
self._loop.run_in_executor(
self._lmz_executor, really_long_process, _sleepy_time))
while time.monotonic() < self._max_execution_time:
print("Bot#{}: thinking for {}s. Run {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs))
await asyncio.sleep(_sleepy_time)
_exec_count += 1
print("Bot#{} Finished Thinking".format(bot))
def really_long_process(sleepy_time):
print("I am a really long computation.....")
_large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(_large_val))
if __name__ == "__main__":
sim = Simulator()
sim.initialise()
sim.run()
这个想法是有一个主模拟循环 运行 并监控三个机器人线程。然后,这些机器人线程中的每一个都会执行一些推理,但也会使用 ProcessPoolExecutor
启动一个非常长的后台进程,这可能最终会 运行 延长它们自己的 threshold/max 推理执行时间。
正如您在上面的代码中所看到的,我试图在发生超时时 .cancel()
这些任务。虽然这并没有真正取消实际计算,但它一直在后台发生,并且 asyncio
循环直到所有长 运行ning 计算完成后才终止。
如何在方法中终止如此长的 运行ning CPU 绑定计算?
Other similar SO questions, but not necessarily related or helpful:
- asyncio: Is it possible to cancel a future been run by an Executor?
- How to terminate a single async task in multiprocessing if that single async task exceeds a threshold time in Python
- Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?
How do I terminate such long running CPU-bound computations within a method?
您尝试的方法无效,因为任务开始执行后 return 由 ProcessPoolExecutor
are not cancellable. Although asyncio's run_in_executor
tries to propagate the cancellation, it is simply ignored 由 Future.cancel
编辑。
这没有根本原因。与线程不同,进程可以安全终止,因此 ProcessPoolExecutor.submit
到 return 一个 cancel
终止相应进程的未来是完全可能的。 Asyncio 协程具有明确定义的取消语义,可以自动使用它。不幸的是,ProcessPoolExecutor.submit
return 是一个常规的 concurrent.futures.Future
,它假定底层执行者的最小公分母,并将 运行 未来视为不可触及的。
因此,要取消在子进程中执行的任务,必须完全绕过 ProcessPoolExecutor
并管理自己的进程。挑战在于如何在不重新实现 multiprocessing
的一半的情况下做到这一点。标准库提供的一个选项是(ab)使用 multiprocessing.Pool
用于此目的,因为它支持工作进程的可靠关闭。 CancellablePool
可以按如下方式工作:
- 不是生成固定数量的进程,而是生成固定数量的 1-worker 池。
- 从 asyncio 协程中将任务分配给池。如果协程在等待另一个进程中的任务完成时被取消,terminate 单进程池并创建一个新的。
- 因为一切都是从单个异步线程协调的,所以不必担心竞争条件,例如意外终止已经开始执行另一个任务的进程。 (如果要在
ProcessPoolExecutor
中支持取消,则需要避免这种情况。)
这是该想法的示例实现:
import asyncio
import multiprocessing
class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()
def _new_pool(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()
def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()
显示取消的简约测试用例:
def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))
async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()
tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()
asyncio.get_event_loop().run_until_complete(main())
请注意 CPU 使用率如何从未超过 3 个核心,以及它如何在接近测试结束时开始下降,表明进程正在按预期终止。
要将其应用于问题中的代码,请将 self._lmz_executor
设为 CancellablePool
的实例并将 self._loop.run_in_executor(...)
更改为 self._loop.create_task(self._lmz_executor.apply(...))
。