如何最好地在 ipyparallel 客户端和远程引擎之间共享静态数据?

How to best share static data between ipyparallel client and remote engines?

我运行在不同参数的循环中进行相同的模拟。每个模拟都使用一个 pandas DataFrame (data),它只被读取,从不被修改。使用 ipyparallel(IPython 并行),我可以在模拟开始之前将此 DataFrames 放入我视图中每个引擎的全局变量 space 中:

view['data'] = data

然后引擎可以访问 DataFrame 以获取所有在其上获得 运行 的模拟。复制数据(如果 pickled,data 是 40MB)的过程只有几秒钟。但是,如果模拟次数增加,内存使用量似乎会增长得非常大。我想这个共享数据正在为每个任务复制,而不仅仅是为每个引擎复制。从客户端与引擎共享静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下它只需要每个主机复制一次(我在 host1 上有 4 个引擎,在 host2 上有 8 个引擎)。

这是我的代码:

from ipyparallel import Client
import pandas as pd

rc = Client()
view = rc[:]  # use all engines
view.scatter('id', rc.ids, flatten=True)  # So we can track which engine performed what task

def do_simulation(tweaks):
    """ Run simulation with specified tweaks """
    #  Do sim stuff using the global data DataFrame
    return results, id, tweaks

if __name__ == '__main__':
    data = pd.read_sql("SELECT * FROM my_table", engine)
    threads = []  # store list of tweaks dicts
    for i in range(4):
        for j in range(5):
            for k in range(6):
                threads.append(dict(i=i, j=j, k=k)

    # Set up globals for each engine.  This is the read-only DataFrame
    view['data'] = data
    ar = view.map_async(do_simulation, threads)

    # Our async results should pop up over time.  Let's measure our progress:
    for idx, (results, id, tweaks) in enumerate(ar):
        print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
        # Store results as a pickle for the future
        pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
        # Save our results to a pickle file
        pd.to_pickle(results, out_file_path + pfile)

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)

如果模拟计数较小 (~50),则需要一段时间才能开始,但我开始看到进度打印语句。奇怪的是,多个任务将分配给同一个引擎,并且在为该引擎完成所有这些分配的任务之前我看不到响应。每次完成单个模拟任务时,我都希望看到 enumerate(ar) 的响应。

如果模拟计数很大(~1000),开始需要很长时间,我看到所有引擎上的 CPU 加速,但直到很长时间(~40 分钟)才看到进度打印语句,当我 do 看到进度时,似乎有一大块(>100)任务转到同一个引擎,并等待该引擎完成,然后再提供一些进度。当那个引擎完成时,我看到 ar 对象每 4 秒提供一次新响应——这可能是写入输出 pickle 文件的时间延迟。

最后,host1 还 运行s 了 ipycontroller 任务,它的内存使用量疯狂上升(Python 任务显示使用 >6GB RAM,内核任务显示使用 3GB)。 host2 引擎并没有真正显示出太多的内存使用情况。什么会导致内存激增?

几年前我在代码中使用过这个逻辑,后来我开始使用 this。我的代码是这样的:

shared_dict = {
    # big dict with ~10k keys, each with a list of dicts
}

balancer = engines.load_balanced_view()

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd
    import ujson as json

engines[:].push(shared_dict)

results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

在我的例子中,my_func() 是一个复杂的方法,我将大量日志消息写入一个文件,所以我有打印语句。

关于任务分配,因为我用过load_balanced_view(),我去图书馆摸索了,效果很好。

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

大概这么久了,没体验过,所以也不好说什么。

我希望这对您的问题有所帮助。


PS:正如我在评论中所说,您可以尝试multiprocessing.Pool. I guess I haven't tried to share a big, read-only data as a global variable using it. I would give a try, because it seems to work

有时您需要按类别分散数据分组,以确保每个子组都完全包含在一个集群中。

我通常是这样做的:

# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview  = client.load_balanced_view()
lview.block = True
CORES = len(client[:])

# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
    sz = df.groupby([grouper]).size().sort_values().index.unique()
    for core in range(CORES):
        ids = sz[core::CORES]
        print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
        client[core].push({name:df[df[grouper].isin(ids)]})

# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')

请注意,我建议的散点函数可确保每个集群都将承载相似数量的观察结果,这通常是个好主意。