大量任务的多处理最佳实践?
Multiprocessing best practice for large number of tasks?
几天前,我问过针对我的特定问题进行多处理的最佳方法是:
基于此我得到以下信息:
nparallel = set_number_of_cores(nparallel)
pool = concurrent.futures.ProcessPoolExecutor(max_workers=nparallel)
args = [(f"{path_in}/{pdb_id}.pdb", f"{path_calc}/{pdb_id}", path_FFT, path_finished,
path_failed, LOG, remove_path_tmp_on_finish) for pdb_id in ids_to_run]
tasks = [pool.submit(relax_fnc,*arg) for arg in args]
for task in tqdm.tqdm(concurrent.futures.as_completed(tasks), total=len(tasks),
smoothing=0.0, desc='Energy Minimization on Folder'):
pass
当我在较小的样本上测试时,这个方法看起来不错。
但现在我正试图 运行 它解决实际问题,只是挂在:
tasks = [pool.submit(relax_fnc,*arg) for arg in args]
我想问题是我提交了太多作业。有什么方法可以轻松解决这个问题吗?我想我可以创建一个初始作业池,该作业池可能是我使用的进程数的 10 倍,然后在它们完成时动态提交更多任务,但这看起来不太漂亮。
所以我希望其他人有更好的解决方案。
(我目前 运行 有 30 万个任务)
以下 class、BoundedQueueProcessPoolExecutor
将默认确保处理池的任务队列不会有超过 N
个任务排队等待 运行,其中默认情况下 N
是池大小(但您可以使用 max_waiting_tasks 关键字参数初始化此 class 的实例并指定任何正整数) .
因此,这 class 会阻止您或多或少地同时创建所有未来实例,但仅在执行继续和任务终止时才创建。因此,as_completed
方法的使用变得不那么有用了,您应该使用完成回调函数,如以下程序所示:
import multiprocessing
import concurrent.futures
import tqdm
class BoundedQueuePoolExecutor:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, future):
self.semaphore.release()
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(self.release)
return future
class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._max_workers
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))
def worker(x):
# simulate taking time:
import time
time.sleep(1)
return x ** 2
def my_callback(future):
bar.update()
# This demo is storing results but yours may not be
# storing 300K results:
results.append(future.result())
# Required for Windows:
if __name__ == '__main__':
params = list(range(1, 101))
results = []
bar = tqdm.tqdm(total=len(params))
with BoundedQueueProcessPoolExecutor() as executor:
for x in params:
future = executor.submit(worker, x)
future.add_done_callback(my_callback)
具有 300,000 个任务的第二个演示(结果未存储)
import concurrent.futures
import tqdm
class BoundedQueuePoolExecutor:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, future):
self.semaphore.release()
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(self.release)
return future
class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._max_workers
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))
def worker(x):
return x ** 2
def my_callback(future):
bar.update()
# Required for Windows:
if __name__ == '__main__':
params = range(1, 300_001)
results = []
bar = tqdm.tqdm(total=300_000)
with BoundedQueueProcessPoolExecutor() as executor:
for x in params:
future = executor.submit(worker, x)
future.add_done_callback(my_callback)
备注
这些演示实际上都不适合多处理,即它们 运行 比串行程序快得多。
几天前,我问过针对我的特定问题进行多处理的最佳方法是:
基于此我得到以下信息:
nparallel = set_number_of_cores(nparallel)
pool = concurrent.futures.ProcessPoolExecutor(max_workers=nparallel)
args = [(f"{path_in}/{pdb_id}.pdb", f"{path_calc}/{pdb_id}", path_FFT, path_finished,
path_failed, LOG, remove_path_tmp_on_finish) for pdb_id in ids_to_run]
tasks = [pool.submit(relax_fnc,*arg) for arg in args]
for task in tqdm.tqdm(concurrent.futures.as_completed(tasks), total=len(tasks),
smoothing=0.0, desc='Energy Minimization on Folder'):
pass
当我在较小的样本上测试时,这个方法看起来不错。 但现在我正试图 运行 它解决实际问题,只是挂在:
tasks = [pool.submit(relax_fnc,*arg) for arg in args]
我想问题是我提交了太多作业。有什么方法可以轻松解决这个问题吗?我想我可以创建一个初始作业池,该作业池可能是我使用的进程数的 10 倍,然后在它们完成时动态提交更多任务,但这看起来不太漂亮。
所以我希望其他人有更好的解决方案。
(我目前 运行 有 30 万个任务)
以下 class、BoundedQueueProcessPoolExecutor
将默认确保处理池的任务队列不会有超过 N
个任务排队等待 运行,其中默认情况下 N
是池大小(但您可以使用 max_waiting_tasks 关键字参数初始化此 class 的实例并指定任何正整数) .
因此,这 class 会阻止您或多或少地同时创建所有未来实例,但仅在执行继续和任务终止时才创建。因此,as_completed
方法的使用变得不那么有用了,您应该使用完成回调函数,如以下程序所示:
import multiprocessing
import concurrent.futures
import tqdm
class BoundedQueuePoolExecutor:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, future):
self.semaphore.release()
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(self.release)
return future
class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._max_workers
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))
def worker(x):
# simulate taking time:
import time
time.sleep(1)
return x ** 2
def my_callback(future):
bar.update()
# This demo is storing results but yours may not be
# storing 300K results:
results.append(future.result())
# Required for Windows:
if __name__ == '__main__':
params = list(range(1, 101))
results = []
bar = tqdm.tqdm(total=len(params))
with BoundedQueueProcessPoolExecutor() as executor:
for x in params:
future = executor.submit(worker, x)
future.add_done_callback(my_callback)
具有 300,000 个任务的第二个演示(结果未存储)
import concurrent.futures
import tqdm
class BoundedQueuePoolExecutor:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, future):
self.semaphore.release()
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(self.release)
return future
class BoundedQueueProcessPoolExecutor(BoundedQueuePoolExecutor, concurrent.futures.ProcessPoolExecutor):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
concurrent.futures.ProcessPoolExecutor.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._max_workers
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePoolExecutor.__init__(self, multiprocessing.BoundedSemaphore(self._max_workers + max_waiting_tasks))
def worker(x):
return x ** 2
def my_callback(future):
bar.update()
# Required for Windows:
if __name__ == '__main__':
params = range(1, 300_001)
results = []
bar = tqdm.tqdm(total=300_000)
with BoundedQueueProcessPoolExecutor() as executor:
for x in params:
future = executor.submit(worker, x)
future.add_done_callback(my_callback)
备注
这些演示实际上都不适合多处理,即它们 运行 比串行程序快得多。