从异步函数监视 multiprocessing.pool.Pool 对象
Monitor a multiprocessing.pool.Pool object from an async function
我有一个由 discord 机器人管理的 multiprocessing.pool.Pool
。由于 discord.py 是异步的,我使用 pool.starmap_async()
、pool.apply_async()
和 AsyncResult.get()
来管理任务。机器人在启动时启动一个池(这可能看起来很奇怪,但由于初始化时间长,这是我正在做的最有效的方法)
有什么方法可以在任何给定时刻检查池中当前 queued/executing 有多少个进程?检查函数可以访问 Pool
本身,但不能访问任何 AsyncResult
s.
我也对产生相同结果的其他方法持开放态度,即池中 active/queued 个进程的数量。
Since discord.py is async, i use pool.starmap_async()
, pool.apply_async()
and AsyncResult.get()
to manage tasks.
除非您采取特别的预防措施,否则这看起来不正确,因为 AsyncResult.get()
会阻塞事件循环。 async之类的方法名中的async并不是asyncio部署的那种async。多处理使用同步代码与子进程通信,在后台线程中执行此操作以允许您的代码继续处理其他事情。
结合 asyncio 和多处理的更安全的方法是 concurrent.futures
module which provides the ProcessPoolExecutor
class that uses multiprocessing internally, and whose executors asyncio supports via run_in_executor
。
is there a way i can check on how many processes are currently queued/executing in the pool at any given moment?
我认为没有 public API 可以查询该信息,但如果您拥有该池,则可以轻松维护必要的统计信息。例如(未经测试):
class Pool:
def __init__(self, nworkers):
self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
self._nworkers = nworkers
self._submitted = 0
async def submit(self, fn, *args):
self._submitted += 1
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(self._executor, fn, *args)
try:
return await fut
finally:
self._submitted -= 1
def stats(self):
queued = max(0, self._submitted - self._nworkers)
executing = min(self._submitted, self._nworkers)
return queued, executing
您可以通过调用 submit()
来使用它,您可以等待它立即获得结果,或者传递给 create_task()
以获得您可以稍后等待的未来或 gather()
以及其他期货等
我有一个由 discord 机器人管理的 multiprocessing.pool.Pool
。由于 discord.py 是异步的,我使用 pool.starmap_async()
、pool.apply_async()
和 AsyncResult.get()
来管理任务。机器人在启动时启动一个池(这可能看起来很奇怪,但由于初始化时间长,这是我正在做的最有效的方法)
有什么方法可以在任何给定时刻检查池中当前 queued/executing 有多少个进程?检查函数可以访问 Pool
本身,但不能访问任何 AsyncResult
s.
我也对产生相同结果的其他方法持开放态度,即池中 active/queued 个进程的数量。
Since discord.py is async, i use
pool.starmap_async()
,pool.apply_async()
andAsyncResult.get()
to manage tasks.
除非您采取特别的预防措施,否则这看起来不正确,因为 AsyncResult.get()
会阻塞事件循环。 async之类的方法名中的async并不是asyncio部署的那种async。多处理使用同步代码与子进程通信,在后台线程中执行此操作以允许您的代码继续处理其他事情。
结合 asyncio 和多处理的更安全的方法是 concurrent.futures
module which provides the ProcessPoolExecutor
class that uses multiprocessing internally, and whose executors asyncio supports via run_in_executor
。
is there a way i can check on how many processes are currently queued/executing in the pool at any given moment?
我认为没有 public API 可以查询该信息,但如果您拥有该池,则可以轻松维护必要的统计信息。例如(未经测试):
class Pool:
def __init__(self, nworkers):
self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
self._nworkers = nworkers
self._submitted = 0
async def submit(self, fn, *args):
self._submitted += 1
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(self._executor, fn, *args)
try:
return await fut
finally:
self._submitted -= 1
def stats(self):
queued = max(0, self._submitted - self._nworkers)
executing = min(self._submitted, self._nworkers)
return queued, executing
您可以通过调用 submit()
来使用它,您可以等待它立即获得结果,或者传递给 create_task()
以获得您可以稍后等待的未来或 gather()
以及其他期货等