如何 运行 嵌套的、分层的 pathos 多处理映射?

How to run nested, hierarchical pathos multiprocessing maps?

在 dill serialization/pickling 上构建了我的代码的重要部分,我还尝试使用 pathos 多处理来并行化我的计算。 Pathos 它是莳萝的自然延伸。

尝试运行嵌套时

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

在另一个 ProcessingPool().map 中,然后我收到:

AssertionError: daemonic processes are not allowed to have children

例如:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

产量

AssertionError: daemonic processes are not allowed to have children

我尝试使用 amap(...).get() 但没有成功。这是在 pathos 0.2.0.

允许嵌套并行化的最佳方法是什么?

更新

在这一点上我必须诚实,并承认我已经从 pathos 中删除了断言 "daemonic processes are not allowed to have children"。我还构建了一些东西,可以将 KeyboardInterrupt 级联到工人和那些工人......下面的部分解决方案:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

似乎可以在控制台和 IPython 笔记本(带停止按钮)上工作,但不确定它在所有极端情况下是否 100% 正确。

我遇到了完全相同的问题。在我的例子中,内部操作是需要并行性的,所以我做了 ThreadingPoolProcessingPool。这是你的例子:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

你甚至可以让另一层有另一个外线程池。根据您的情况,您可以颠倒这些池的顺序。但是,您不能拥有流程的流程。如果确实需要,请参阅:。我自己还没有尝试过,所以我不能详细说明。