如何 运行 嵌套的、分层的 pathos 多处理映射?
How to run nested, hierarchical pathos multiprocessing maps?
在 dill serialization/pickling 上构建了我的代码的重要部分,我还尝试使用 pathos 多处理来并行化我的计算。 Pathos 它是莳萝的自然延伸。
尝试运行嵌套时
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
在另一个 ProcessingPool().map
中,然后我收到:
AssertionError: daemonic processes are not allowed to have children
例如:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
产量
AssertionError: daemonic processes are not allowed to have children
我尝试使用 amap(...).get()
但没有成功。这是在 pathos 0.2.0.
允许嵌套并行化的最佳方法是什么?
更新
在这一点上我必须诚实,并承认我已经从 pathos 中删除了断言 "daemonic processes are not allowed to have children"
。我还构建了一些东西,可以将 KeyboardInterrupt
级联到工人和那些工人......下面的部分解决方案:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
似乎可以在控制台和 IPython 笔记本(带停止按钮)上工作,但不确定它在所有极端情况下是否 100% 正确。
我遇到了完全相同的问题。在我的例子中,内部操作是需要并行性的,所以我做了 ThreadingPool
的 ProcessingPool
。这是你的例子:
from pathos.multiprocessing import ProcessingPool, ThreadingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ThreadingPool().map(refork, xrange(3))
你甚至可以让另一层有另一个外线程池。根据您的情况,您可以颠倒这些池的顺序。但是,您不能拥有流程的流程。如果确实需要,请参阅:。我自己还没有尝试过,所以我不能详细说明。
在 dill serialization/pickling 上构建了我的代码的重要部分,我还尝试使用 pathos 多处理来并行化我的计算。 Pathos 它是莳萝的自然延伸。
尝试运行嵌套时
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
在另一个 ProcessingPool().map
中,然后我收到:
AssertionError: daemonic processes are not allowed to have children
例如:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
产量
AssertionError: daemonic processes are not allowed to have children
我尝试使用 amap(...).get()
但没有成功。这是在 pathos 0.2.0.
允许嵌套并行化的最佳方法是什么?
更新
在这一点上我必须诚实,并承认我已经从 pathos 中删除了断言 "daemonic processes are not allowed to have children"
。我还构建了一些东西,可以将 KeyboardInterrupt
级联到工人和那些工人......下面的部分解决方案:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
似乎可以在控制台和 IPython 笔记本(带停止按钮)上工作,但不确定它在所有极端情况下是否 100% 正确。
我遇到了完全相同的问题。在我的例子中,内部操作是需要并行性的,所以我做了 ThreadingPool
的 ProcessingPool
。这是你的例子:
from pathos.multiprocessing import ProcessingPool, ThreadingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ThreadingPool().map(refork, xrange(3))
你甚至可以让另一层有另一个外线程池。根据您的情况,您可以颠倒这些池的顺序。但是,您不能拥有流程的流程。如果确实需要,请参阅:。我自己还没有尝试过,所以我不能详细说明。