从异步函数监视 multiprocessing.pool.Pool 对象

Monitor a multiprocessing.pool.Pool object from an async function

我有一个由 discord 机器人管理的 multiprocessing.pool.Pool。由于 discord.py 是异步的,我使用 pool.starmap_async()pool.apply_async()AsyncResult.get() 来管理任务。机器人在启动时启动一个池(这可能看起来很奇怪,但由于初始化时间长,这是我正在做的最有效的方法)

有什么方法可以在任何给定时刻检查池中当前 queued/executing 有多少个进程?检查函数可以访问 Pool 本身,但不能访问任何 AsyncResults.

我也对产生相同结果的其他方法持开放态度,即池中 active/queued 个进程的数量。

Since discord.py is async, i use pool.starmap_async(), pool.apply_async() and AsyncResult.get() to manage tasks.

除非您采取特别的预防措施,否则这看起来不正确,因为 AsyncResult.get() 会阻塞事件循环。 async之类的方法名中的async并不是asyncio部署的那种async。多处理使用同步代码与子进程通信,在后台线程中执行此操作以允许您的代码继续处理其他事情。

结合 asyncio 和多​​处理的更安全的方法是 concurrent.futures module which provides the ProcessPoolExecutor class that uses multiprocessing internally, and whose executors asyncio supports via run_in_executor

is there a way i can check on how many processes are currently queued/executing in the pool at any given moment?

我认为没有 public API 可以查询该信息,但如果您拥有该池,则可以轻松维护必要的统计信息。例如(未经测试):

class Pool:
    def __init__(self, nworkers):
        self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
        self._nworkers = nworkers
        self._submitted = 0

    async def submit(self, fn, *args):
        self._submitted += 1
        loop = asyncio.get_event_loop()
        fut = loop.run_in_executor(self._executor, fn, *args)
        try:
            return await fut
        finally:
            self._submitted -= 1

    def stats(self):
        queued = max(0, self._submitted - self._nworkers)
        executing = min(self._submitted, self._nworkers)
        return queued, executing

您可以通过调用 submit() 来使用它,您可以等待它立即获得结果,或者传递给 create_task() 以获得您可以稍后等待的未来或 gather() 以及其他期货等