如何将特定数量的额外工作人员添加到现有的多处理池?

how to add specific number of additional workers to an exisiting multiprocessing pool?

在下面的情况下,我创建了一个包含两个工作人员的默认池并执行任务。在任务处理期间,task_queue 会定期检查,因此它不会超过特定的长度限制并防止 up/down 流混乱。如何动态添加更多worker来减少任务队列长度?

import multiprocessing as mp

... code snippet...

def main(poolsize, start_process):

    pool = mp.Pool(processes=poolsize, initializer=start_process)
    done = False

    task_queue = []

    while True:

        ... snippet code : do something ...

        if len(task_queue) >= 10:

            ... code to expand pool goes here...

        if done == True:
            break

    .. do final something ...

if __name__ == '__main__':

#    freeze_support()

    poolsize = 2

    main(poolsize)

要在 运行 池处理作业期间添加更多工人,您可以在 while 循环中添加以下函数:


def repopulate(pool, add_workers):

    current_pool_size = len(pool._pool)         # _.pool gets the current pool size.

    new_pool_size = current_pool_size + add_workers

    pool._processes = new_pool_size

    pool._repopulate_pool()

    return pool

main() 的 while 循环中:


if len(task_queue) >= 10:

    new_workers = 2

    repopulate(poolname, new_workers)