与 ipyparallel 集群共享队列

Sharing queues with ipyparallel cluster

我正在尝试将 ipyparallel 作为 multiprocessing 的替代方案集成到我的主从架构中。

即目前所有进程都有两个队列:

目前我使用 multiprocessing.Manager().Queue() 队列进行通信。然而,它们似乎无法与 ipyparallel 个进程共享。

我完全这样做(而不仅仅是通过函数)的原因是 "setting up" 从头开始​​的工作人员几乎与执行计算一样昂贵(在计算方面)。我更喜欢 运行 一个函数(通过 map_async 或类似函数),它可以在工作人员上设置环境,执行第一次计算,将结果推送到结果队列,然后获取(明显更小) 从任务队列更新并重复最后几个步骤直到停止(再次通过队列)。

如果有更好的方法/框架来完成此类任务(但是必须 python),我洗耳恭听。

谢谢

使用 IPython 并行,通常 "setup" 使用 DirectView,然后分发依赖于该设置的较小任务作为传递给负载平衡视图的函数。

设置您的客户端和视图:

import ipyparallel as ipp

rc = ipp.Client()
dview = rc[:]
lbview = rc.load_balanced_view()

使用直接视图进行设置:

dview.execute("data = setup()")

您现在可以在您的任务中使用 ipp.Reference:

def task(data):
    analyze(data)

rdata = ipp.Reference('data')
ar = view.apply(task, rdata)
result = ar.get()

通过这种方式,您可以在任何地方进行一次设置,然后 运行 以负载均衡方式依赖于该设置的任务。