与 ipyparallel 集群共享队列
Sharing queues with ipyparallel cluster
我正在尝试将 ipyparallel
作为 multiprocessing
的替代方案集成到我的主从架构中。
即目前所有进程都有两个队列:
- 一个用于从主到从的任务
- 一个用于从奴隶到主人的结果。
目前我使用 multiprocessing.Manager().Queue()
队列进行通信。然而,它们似乎无法与 ipyparallel
个进程共享。
我完全这样做(而不仅仅是通过函数)的原因是 "setting up" 从头开始的工作人员几乎与执行计算一样昂贵(在计算方面)。我更喜欢 运行 一个函数(通过 map_async
或类似函数),它可以在工作人员上设置环境,执行第一次计算,将结果推送到结果队列,然后获取(明显更小) 从任务队列更新并重复最后几个步骤直到停止(再次通过队列)。
如果有更好的方法/框架来完成此类任务(但是必须 python),我洗耳恭听。
谢谢
使用 IPython 并行,通常 "setup" 使用 DirectView,然后分发依赖于该设置的较小任务作为传递给负载平衡视图的函数。
设置您的客户端和视图:
import ipyparallel as ipp
rc = ipp.Client()
dview = rc[:]
lbview = rc.load_balanced_view()
使用直接视图进行设置:
dview.execute("data = setup()")
您现在可以在您的任务中使用 ipp.Reference
:
def task(data):
analyze(data)
rdata = ipp.Reference('data')
ar = view.apply(task, rdata)
result = ar.get()
通过这种方式,您可以在任何地方进行一次设置,然后 运行 以负载均衡方式依赖于该设置的任务。
我正在尝试将 ipyparallel
作为 multiprocessing
的替代方案集成到我的主从架构中。
即目前所有进程都有两个队列:
- 一个用于从主到从的任务
- 一个用于从奴隶到主人的结果。
目前我使用 multiprocessing.Manager().Queue()
队列进行通信。然而,它们似乎无法与 ipyparallel
个进程共享。
我完全这样做(而不仅仅是通过函数)的原因是 "setting up" 从头开始的工作人员几乎与执行计算一样昂贵(在计算方面)。我更喜欢 运行 一个函数(通过 map_async
或类似函数),它可以在工作人员上设置环境,执行第一次计算,将结果推送到结果队列,然后获取(明显更小) 从任务队列更新并重复最后几个步骤直到停止(再次通过队列)。
如果有更好的方法/框架来完成此类任务(但是必须 python),我洗耳恭听。
谢谢
使用 IPython 并行,通常 "setup" 使用 DirectView,然后分发依赖于该设置的较小任务作为传递给负载平衡视图的函数。
设置您的客户端和视图:
import ipyparallel as ipp
rc = ipp.Client()
dview = rc[:]
lbview = rc.load_balanced_view()
使用直接视图进行设置:
dview.execute("data = setup()")
您现在可以在您的任务中使用 ipp.Reference
:
def task(data):
analyze(data)
rdata = ipp.Reference('data')
ar = view.apply(task, rdata)
result = ar.get()
通过这种方式,您可以在任何地方进行一次设置,然后 运行 以负载均衡方式依赖于该设置的任务。