如何监控python的concurrent.futures.ProcessPoolExecutor?
How to monitor python's concurrent.futures.ProcessPoolExecutor?
我们在服务中使用 concurrent.futures
中的 ProcessPoolExecutor 异步接收请求,并在进程池中进行实际的同步处理。
一旦我们 运行 进入进程池耗尽的情况,因此新请求必须等到其他一些进程完成。
有没有办法查询进程池的当前使用情况?这将使我们能够监控它们的状态并进行适当的容量规划。
如果没有,是否有支持此类 monitoring/capacity 规划的具有异步接口的任何好的替代进程池实现?
最简单的方法是使用所需的行为扩展 ProcessPoolExecutor
。下面的示例维护 stdlib 接口并且不访问实现细节:
from concurrent.futures import ProcessPoolExecutor
class MyProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._running_workers = 0
def submit(self, *args, **kwargs):
future = super().submit(*args, **kwargs)
self._running_workers += 1
future.add_done_callback(self._worker_is_done)
return future
def _worker_is_done(self, future):
self._running_workers -= 1
def get_pool_usage(self):
return self._running_workers
我最近以稍微不同的方式为自己解决了这个问题。简化了,这是我所做的:
- 我在主循环范围内定义的集合中从外部跟踪未决期货。
- 我为每个 future 附加了一个回调,这个回调是对 futures 集合的闭包,允许它在完成时从集合中删除 future。
所以,鉴于 done()
是 actual 回调函数,定义在别处,下面是在我的主循环范围内定义的:
bag = set()
def make_callback(b):
def callback(f):
nonlocal b
b.remove(f)
done(f)
return callback
对于我提交给 ProcessPoolExecutor 的每个未来 f
,我添加回调:
f.add_done_callback(make_callback(bag))
在任何时候,都可以通过查看 bag
的内容来查看待定和 运行 期货的列表,可选择根据期货的 running()
方法的结果进行过滤.例如:
print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))
对于许多简单的用例,模块级设置变量可能与闭包一样有效。
我们在服务中使用 concurrent.futures
中的 ProcessPoolExecutor 异步接收请求,并在进程池中进行实际的同步处理。
一旦我们 运行 进入进程池耗尽的情况,因此新请求必须等到其他一些进程完成。
有没有办法查询进程池的当前使用情况?这将使我们能够监控它们的状态并进行适当的容量规划。
如果没有,是否有支持此类 monitoring/capacity 规划的具有异步接口的任何好的替代进程池实现?
最简单的方法是使用所需的行为扩展 ProcessPoolExecutor
。下面的示例维护 stdlib 接口并且不访问实现细节:
from concurrent.futures import ProcessPoolExecutor
class MyProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._running_workers = 0
def submit(self, *args, **kwargs):
future = super().submit(*args, **kwargs)
self._running_workers += 1
future.add_done_callback(self._worker_is_done)
return future
def _worker_is_done(self, future):
self._running_workers -= 1
def get_pool_usage(self):
return self._running_workers
我最近以稍微不同的方式为自己解决了这个问题。简化了,这是我所做的:
- 我在主循环范围内定义的集合中从外部跟踪未决期货。
- 我为每个 future 附加了一个回调,这个回调是对 futures 集合的闭包,允许它在完成时从集合中删除 future。
所以,鉴于 done()
是 actual 回调函数,定义在别处,下面是在我的主循环范围内定义的:
bag = set()
def make_callback(b):
def callback(f):
nonlocal b
b.remove(f)
done(f)
return callback
对于我提交给 ProcessPoolExecutor 的每个未来 f
,我添加回调:
f.add_done_callback(make_callback(bag))
在任何时候,都可以通过查看 bag
的内容来查看待定和 运行 期货的列表,可选择根据期货的 running()
方法的结果进行过滤.例如:
print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))
对于许多简单的用例,模块级设置变量可能与闭包一样有效。