使用初始化启动 concurrent.futures.ProcessPoolExecutor?
Launch concurrent.futures.ProcessPoolExecutor with initialization?
我打算使用 concurrent.futures.ProcessPoolExecutor
来并行执行函数。根据documentation,其executor
object只能接受map
中的一个简单函数。我的实际情况涉及 'to-be-parallelized' 函数执行之前的初始化(加载数据)。我该如何安排?
'to-be-parallelized'函数在迭代中被多次调用。我不希望它每次都是 re-initialized。
换句话说,有一个 init
函数可以为这个 tbp 函数产生一些输出。每个 child 都应该有自己的输出副本,因为函数依赖于它。
如果使用 Python 3.7 或更高版本,请使用 。此答案仅与早期 Python 版本相关,其中 concurrent.futures
不支持传递 initializer
函数。
听起来您正在寻找 initializer
/initargs
选项的等效项,multiprocessing.Pool
takes. Currently, that behavior doesn't exist for concurrent.futures.ProcessPoolExecutor
, though there is a patch waiting for review 添加了该行为。
因此,您可以使用 multiprocessing.Pool
(这可能适合您的用例),等待该补丁合并和发布(您可能需要等待一段时间 :)),或者推出您自己的补丁解决方案。事实证明,为带有 initializer
的 map 编写包装函数并不难,但每个进程只调用它一个:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
inited = False
initresult = None
def initwrapper(initfunc, initargs, f, x):
# This will be called in the child. inited
# Will be False the first time its called, but then
# remain True every other time its called in a given
# worker process.
global inited, initresult
if not inited:
inited = True
initresult = initfunc(*initargs)
return f(x)
def do_init(a,b):
print('ran init {} {}'.format(a,b))
return os.getpid() # Just to demonstrate it will be unique per process
def f(x):
print("Hey there {}".format(x))
print('initresult is {}'.format(initresult))
return x+1
def initmap(executor, initializer, initargs, f, it):
return executor.map(partial(initwrapper, initializer, initargs, f), it)
if __name__ == "__main__":
with ProcessPoolExecutor(4) as executor:
out = initmap(executor, do_init, (5,6), f, range(10))
print(list(out))
输出:
ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
从 Python 3.7
开始,ThreadPoolExecutor
和 ProcessPoolExecutor
都有可选的 initializer
和 initargs
参数。每个thread/process开始后都会调用initializer(*initargs)
。
参见https://docs.python.org/3.7/library/concurrent.futures.html。
我打算使用 concurrent.futures.ProcessPoolExecutor
来并行执行函数。根据documentation,其executor
object只能接受map
中的一个简单函数。我的实际情况涉及 'to-be-parallelized' 函数执行之前的初始化(加载数据)。我该如何安排?
'to-be-parallelized'函数在迭代中被多次调用。我不希望它每次都是 re-initialized。
换句话说,有一个 init
函数可以为这个 tbp 函数产生一些输出。每个 child 都应该有自己的输出副本,因为函数依赖于它。
如果使用 Python 3.7 或更高版本,请使用 concurrent.futures
不支持传递 initializer
函数。
听起来您正在寻找 initializer
/initargs
选项的等效项,multiprocessing.Pool
takes. Currently, that behavior doesn't exist for concurrent.futures.ProcessPoolExecutor
, though there is a patch waiting for review 添加了该行为。
因此,您可以使用 multiprocessing.Pool
(这可能适合您的用例),等待该补丁合并和发布(您可能需要等待一段时间 :)),或者推出您自己的补丁解决方案。事实证明,为带有 initializer
的 map 编写包装函数并不难,但每个进程只调用它一个:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
inited = False
initresult = None
def initwrapper(initfunc, initargs, f, x):
# This will be called in the child. inited
# Will be False the first time its called, but then
# remain True every other time its called in a given
# worker process.
global inited, initresult
if not inited:
inited = True
initresult = initfunc(*initargs)
return f(x)
def do_init(a,b):
print('ran init {} {}'.format(a,b))
return os.getpid() # Just to demonstrate it will be unique per process
def f(x):
print("Hey there {}".format(x))
print('initresult is {}'.format(initresult))
return x+1
def initmap(executor, initializer, initargs, f, it):
return executor.map(partial(initwrapper, initializer, initargs, f), it)
if __name__ == "__main__":
with ProcessPoolExecutor(4) as executor:
out = initmap(executor, do_init, (5,6), f, range(10))
print(list(out))
输出:
ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
从 Python 3.7
开始,ThreadPoolExecutor
和 ProcessPoolExecutor
都有可选的 initializer
和 initargs
参数。每个thread/process开始后都会调用initializer(*initargs)
。
参见https://docs.python.org/3.7/library/concurrent.futures.html。