pathos pools:在 N 个任务后更新工作进程
pathos pools: Renew worker processes after N tasks
我正在构建一个并行 python 应用程序,它实质上是围绕外部库调用 C 包装器。需要并行性才能 运行 在所有 CPU 核心上同时进行计算。
我最终使用了 pathos.multiprocessing.ProcessPool
,但是这些池缺少标准 multiprocessing.Pool
class 构造函数 (see reference here ) 的 maxtaskperchild
参数。我需要此功能,因为 C 库依赖于进程时钟来定义一些执行时间限制,这些限制最终会在任务堆积时达到。
有没有办法让 ProcessPool
经理在给定数量的任务后更新工作进程?
阐明我意图的示例代码:
from pathos.pools import ProcessPool
from os import getpid
import collections
def print_pid(task_id):
pid = getpid()
return pid
if __name__ == "__main__":
NUM_TASKS = 50
MAX_PER_CHILD = 2
# limit each process to maximum MAX_PER_CHILD tasks
# we would like the pool to exit the process and spawn a new one
# when a task counter reaches the limit
# below argument 'maxtasksperchild' would work with standard 'multiprocessing'
pool = ProcessPool(ncpu=2, maxtasksperchild=MAX_PER_CHILD)
results = pool.map(print_pid, range(NUM_TASKS), chunksize=1)
tasks_per_pid = dict(collections.Counter(results))
print(tasks_per_pid)
# printed result
# {918: 8, 919: 6, 920: 6, 921: 6, 922: 6, 923: 6, 924: 6, 925: 6}
# observe that all processes did more than MAX_PER_CHILD tasks
我试过的
- 在
ProcessPool
构造函数中设置 maxtasksperchild
(参见上面的简单示例)似乎没有做任何事情
- 在辅助函数中调用
sys.exit()
使程序挂起
- 我在深入研究源代码时发现了一些提示
在 pathos.multiprocessing
中有两个池:
ProcessPool
和 _ProcessPool
。前者旨在拥有一个增强的池生命周期,最大限度地减少启动时间,并具有持久性和重新启动功能——但是,缺少一些“multiprocessing
”关键字。后者 (_ProcessPool
) 是 API 向下设计的一级,并提供与 multiprocessing
Pool
接口相同的接口(但使用 dill
) .所以,看看 _ProcessPool
.
我正在构建一个并行 python 应用程序,它实质上是围绕外部库调用 C 包装器。需要并行性才能 运行 在所有 CPU 核心上同时进行计算。
我最终使用了 pathos.multiprocessing.ProcessPool
,但是这些池缺少标准 multiprocessing.Pool
class 构造函数 (see reference here ) 的 maxtaskperchild
参数。我需要此功能,因为 C 库依赖于进程时钟来定义一些执行时间限制,这些限制最终会在任务堆积时达到。
有没有办法让 ProcessPool
经理在给定数量的任务后更新工作进程?
阐明我意图的示例代码:
from pathos.pools import ProcessPool
from os import getpid
import collections
def print_pid(task_id):
pid = getpid()
return pid
if __name__ == "__main__":
NUM_TASKS = 50
MAX_PER_CHILD = 2
# limit each process to maximum MAX_PER_CHILD tasks
# we would like the pool to exit the process and spawn a new one
# when a task counter reaches the limit
# below argument 'maxtasksperchild' would work with standard 'multiprocessing'
pool = ProcessPool(ncpu=2, maxtasksperchild=MAX_PER_CHILD)
results = pool.map(print_pid, range(NUM_TASKS), chunksize=1)
tasks_per_pid = dict(collections.Counter(results))
print(tasks_per_pid)
# printed result
# {918: 8, 919: 6, 920: 6, 921: 6, 922: 6, 923: 6, 924: 6, 925: 6}
# observe that all processes did more than MAX_PER_CHILD tasks
我试过的
- 在
ProcessPool
构造函数中设置maxtasksperchild
(参见上面的简单示例)似乎没有做任何事情 - 在辅助函数中调用
sys.exit()
使程序挂起 - 我在深入研究源代码时发现了一些提示
在 pathos.multiprocessing
中有两个池:
ProcessPool
和 _ProcessPool
。前者旨在拥有一个增强的池生命周期,最大限度地减少启动时间,并具有持久性和重新启动功能——但是,缺少一些“multiprocessing
”关键字。后者 (_ProcessPool
) 是 API 向下设计的一级,并提供与 multiprocessing
Pool
接口相同的接口(但使用 dill
) .所以,看看 _ProcessPool
.