大量任务的多处理最佳实践?

Multiprocessing best practice for large number of tasks?

几天前,我问过针对我的特定问题进行多处理的最佳方法是:

基于此我得到以下信息:

nparallel = set_number_of_cores(nparallel)
pool = concurrent.futures.ProcessPoolExecutor(max_workers=nparallel)

args = [(f"{path_in}/{pdb_id}.pdb", f"{path_calc}/{pdb_id}", path_FFT, path_finished,
         path_failed, LOG, remove_path_tmp_on_finish) for pdb_id in ids_to_run]
tasks = [pool.submit(relax_fnc,*arg) for arg in args]
for task in tqdm.tqdm(concurrent.futures.as_completed(tasks), total=len(tasks),
smoothing=0.0, desc='Energy Minimization on Folder'):
    pass

当我在较小的样本上测试时,这个方法看起来不错。 但现在我正试图 运行 它解决实际问题,只是挂在:

tasks = [pool.submit(relax_fnc,*arg) for arg in args]

我想问题是我提交了太多作业。有什么方法可以轻松解决这个问题吗?我想我可以创建一个初始作业池,该作业池可能是我使用的进程数的 10 倍,然后在它们完成时动态提交更多任务,但这看起来不太漂亮。

所以我希望其他人有更好的解决方案。

(我目前 运行 有 30 万个任务)

以下 class、BoundedQueueProcessPoolExecutor 将默认确保处理池的任务队列不会有超过 N 个任务排队等待 运行,其中默认情况下 N 是池大小(但您可以使用 max_waiting_tasks 关键字参数初始化此 class 的实例并指定任何正整数) .

因此,这 class 会阻止您或多或少地同时创建所有未来实例,但仅在执行继续和任务终止时才创建。因此,as_completed 方法的使用变得不那么有用了,您应该使用完成回调函数,如以下程序所示:

import multiprocessing
import concurrent.futures
import tqdm


class BoundedQueuePoolExecutor:
    def __init__(self, semaphore):
        self.semaphore = semaphore

    def release(self, future):
        self.semaphore.release()

    def submit(self, fn, *args, **kwargs):
        self.semaphore.acquire()
        future = super().submit(fn, *args, **kwargs)
        future.add_done_callback(self.release)
        return future

class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._max_workers
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))


def worker(x):
    # simulate taking time:
    import time
    time.sleep(1)
    return x ** 2

def my_callback(future):
    bar.update()
    # This demo is storing results but yours may not be
    # storing 300K results:
    results.append(future.result())

# Required for Windows:
if __name__ == '__main__':
    params = list(range(1, 101))
    results = []
    bar = tqdm.tqdm(total=len(params))
    with BoundedQueueProcessPoolExecutor() as executor:
        for x in params:
            future = executor.submit(worker, x)
            future.add_done_callback(my_callback)

具有 300,000 个任务的第二个演示(结果未存储)

import concurrent.futures
import tqdm


class BoundedQueuePoolExecutor:
    def __init__(self, semaphore):
        self.semaphore = semaphore

    def release(self, future):
        self.semaphore.release()

    def submit(self, fn, *args, **kwargs):
        self.semaphore.acquire()
        future = super().submit(fn, *args, **kwargs)
        future.add_done_callback(self.release)
        return future

class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._max_workers
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))


def worker(x):
    return x ** 2

def my_callback(future):
    bar.update()

# Required for Windows:
if __name__ == '__main__':
    params = range(1, 300_001)
    results = []
    bar = tqdm.tqdm(total=300_000)
    with BoundedQueueProcessPoolExecutor() as executor:
        for x in params:
            future = executor.submit(worker, x)
            future.add_done_callback(my_callback)

备注

这些演示实际上都不适合多处理,即它们 运行 比串行程序快得多。