在多处理中启动嵌套进程
Launching nested processes in multiprocessing
我有一个启动多个进程的主文件,其中一个进程再次启动多个进程。我在启动嵌套进程集时遇到问题。
我在一个文件中有以下代码:
# parallel_test.py
import Queue
import multiprocessing
import time
import threading
def worker(q):
while not q.empty():
try:
row = q.get(False)
print row
time.sleep(1)
except Queue.Empty:
break
def main():
print 'creating queue'
q = multiprocessing.Queue()
print 'enqueuing'
for i in range(100):
q.put(i)
num_processes = 15
pool = []
for i in range(num_processes):
print 'launching process {0}'.format(i)
p = multiprocessing.Process(target=worker, args=(q,))
p.start()
pool.append(p)
for p in pool:
p.join()
if __name__ == '__main__':
main()
运行 这个文件单独 python parallel_test.py
工作正常并按预期打印数字。但是从另一个文件作为另一个进程启动它会导致问题。我的主文件:
# main_loop_test.py
import parallel_test
from multiprocessing import Pool
import time
def main():
targets = [parallel_test.main]
running = True
while running:
try:
p = Pool(12)
for target in targets:
p.apply_async(target)
p.close() # For some reason you need to run close() before join()
p.join() # What for all the steps to be done
print 'All steps done'
time.sleep(2)
except KeyboardInterrupt as e:
print "<<<<<<<<<<<<<<<<<<CAUGHT KEYBOARD INTERRUPT FROM USER>>>>>>>>>>>>>>>>>>>"
running = False
if __name__ == '__main__':
main()
它 parallel_test.py
似乎尝试启动一个进程(什么都不做)然后退出该函数并 main_loop_test.py
打印 'All steps done'。没有数字被打印出来。输出:
creating queue
enqueuing
launching process 0
All steps done
creating queue
enqueuing
launching process 0
All steps done
怎么了?我使用 Pool
遇到了同样的问题,而不是在 parallel_test.py
中自己管理流程。不过,用线程替换多处理是可行的。
当您从另一个程序将它作为子进程调用时,您无法从 parallel_test 创建子进程,因为该进程被创建为守护进程,并且如 link https://docs.python.org/2/library/multiprocessing.html ,守护进程不允许创建子进程。您必须通过将进程的守护进程 属性 设置为 false 来将进程创建为非守护进程,如下所示。
p = multiprocessing.Process(target=test.main)
p.daemon = False
p.start()
p.join()
当您通过 Pool 模块创建子进程时,我不确定如何设置守护进程 属性。您可以尝试通过初始化列表传递此 属性。
您的意思是通过 multiprocessing
模块使用分层并行性,就像这个显示在异步处理映射中执行的阻塞处理映射的示例?
>>> def squared(x):
... return x**2
...
>>> def triple(x):
... return 3*x
...
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> res = PPool().amap(triple, PPool().map(squared, xrange(10)))
>>> res.get()
[0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
我正在使用 multiprocessing
的 pathos
分支,因为它比标准库版本更容易使用。
另请参阅此处的另一个示例:
编辑:上面的问题被删除了......所以我在link这里添加了以前的内容:
下面介绍如何使用两种不同类型的池进行嵌套并行处理。
>>> from pathos.multiprocessing import ProcessingPool, ThreadingPool
>>> # build a non-blocking processing pool map (i.e. async_map)
>>> amap = ProcessingPool().amap
>>> # build a blocking thread pool map
>>> tmap = ThreadingPool().map
>>>
>>> # define an 'inner' function
>>> def g(x):
... import random
... return int(x * random.random())
...
>>> # parallelize the inner function
>>> def h(x):
... return sum(tmap(g, x))
...
>>> # define the 'outer' function
>>> def f(x,y):
... return x*y
...
>>> # define two lists of different lengths
>>> x = range(10)
>>> y = range(5)
>>>
>>> # evaluate in nested parallel (done several times, for effect)
>>> res1 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res2 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res3 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res4 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res1.get()
[0, 15, 34, 57, 64]
>>> res2.get()
[0, 21, 36, 57, 64]
>>> res3.get()
[0, 10, 40, 51, 68]
>>> res4.get()
[0, 28, 22, 39, 116]
注意,我没有故意使用 [h(x)]*len(y)
,因为那样只会调用 h
一次;但是也可以这样称呼:
>>> def _f(m, g, x, y):
... return sum(m(g, x)) * y
...
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 26, 42, 57, 68]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 20, 50, 78, 92]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 12, 20, 51, 92]
您可以在此处获取 pathos
:https://github.com/uqfoundation
我有一个启动多个进程的主文件,其中一个进程再次启动多个进程。我在启动嵌套进程集时遇到问题。
我在一个文件中有以下代码:
# parallel_test.py
import Queue
import multiprocessing
import time
import threading
def worker(q):
while not q.empty():
try:
row = q.get(False)
print row
time.sleep(1)
except Queue.Empty:
break
def main():
print 'creating queue'
q = multiprocessing.Queue()
print 'enqueuing'
for i in range(100):
q.put(i)
num_processes = 15
pool = []
for i in range(num_processes):
print 'launching process {0}'.format(i)
p = multiprocessing.Process(target=worker, args=(q,))
p.start()
pool.append(p)
for p in pool:
p.join()
if __name__ == '__main__':
main()
运行 这个文件单独 python parallel_test.py
工作正常并按预期打印数字。但是从另一个文件作为另一个进程启动它会导致问题。我的主文件:
# main_loop_test.py
import parallel_test
from multiprocessing import Pool
import time
def main():
targets = [parallel_test.main]
running = True
while running:
try:
p = Pool(12)
for target in targets:
p.apply_async(target)
p.close() # For some reason you need to run close() before join()
p.join() # What for all the steps to be done
print 'All steps done'
time.sleep(2)
except KeyboardInterrupt as e:
print "<<<<<<<<<<<<<<<<<<CAUGHT KEYBOARD INTERRUPT FROM USER>>>>>>>>>>>>>>>>>>>"
running = False
if __name__ == '__main__':
main()
它 parallel_test.py
似乎尝试启动一个进程(什么都不做)然后退出该函数并 main_loop_test.py
打印 'All steps done'。没有数字被打印出来。输出:
creating queue
enqueuing
launching process 0
All steps done
creating queue
enqueuing
launching process 0
All steps done
怎么了?我使用 Pool
遇到了同样的问题,而不是在 parallel_test.py
中自己管理流程。不过,用线程替换多处理是可行的。
当您从另一个程序将它作为子进程调用时,您无法从 parallel_test 创建子进程,因为该进程被创建为守护进程,并且如 link https://docs.python.org/2/library/multiprocessing.html ,守护进程不允许创建子进程。您必须通过将进程的守护进程 属性 设置为 false 来将进程创建为非守护进程,如下所示。
p = multiprocessing.Process(target=test.main)
p.daemon = False
p.start()
p.join()
当您通过 Pool 模块创建子进程时,我不确定如何设置守护进程 属性。您可以尝试通过初始化列表传递此 属性。
您的意思是通过 multiprocessing
模块使用分层并行性,就像这个显示在异步处理映射中执行的阻塞处理映射的示例?
>>> def squared(x):
... return x**2
...
>>> def triple(x):
... return 3*x
...
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> res = PPool().amap(triple, PPool().map(squared, xrange(10)))
>>> res.get()
[0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
我正在使用 multiprocessing
的 pathos
分支,因为它比标准库版本更容易使用。
另请参阅此处的另一个示例:
编辑:上面的问题被删除了......所以我在link这里添加了以前的内容: 下面介绍如何使用两种不同类型的池进行嵌套并行处理。
>>> from pathos.multiprocessing import ProcessingPool, ThreadingPool
>>> # build a non-blocking processing pool map (i.e. async_map)
>>> amap = ProcessingPool().amap
>>> # build a blocking thread pool map
>>> tmap = ThreadingPool().map
>>>
>>> # define an 'inner' function
>>> def g(x):
... import random
... return int(x * random.random())
...
>>> # parallelize the inner function
>>> def h(x):
... return sum(tmap(g, x))
...
>>> # define the 'outer' function
>>> def f(x,y):
... return x*y
...
>>> # define two lists of different lengths
>>> x = range(10)
>>> y = range(5)
>>>
>>> # evaluate in nested parallel (done several times, for effect)
>>> res1 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res2 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res3 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res4 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res1.get()
[0, 15, 34, 57, 64]
>>> res2.get()
[0, 21, 36, 57, 64]
>>> res3.get()
[0, 10, 40, 51, 68]
>>> res4.get()
[0, 28, 22, 39, 116]
注意,我没有故意使用 [h(x)]*len(y)
,因为那样只会调用 h
一次;但是也可以这样称呼:
>>> def _f(m, g, x, y):
... return sum(m(g, x)) * y
...
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 26, 42, 57, 68]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 20, 50, 78, 92]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 12, 20, 51, 92]
您可以在此处获取 pathos
:https://github.com/uqfoundation