如何最好地在 ipyparallel 客户端和远程引擎之间共享静态数据?
How to best share static data between ipyparallel client and remote engines?
我运行在不同参数的循环中进行相同的模拟。每个模拟都使用一个 pandas DataFrame (data
),它只被读取,从不被修改。使用 ipyparallel
(IPython 并行),我可以在模拟开始之前将此 DataFrames 放入我视图中每个引擎的全局变量 space 中:
view['data'] = data
然后引擎可以访问 DataFrame 以获取所有在其上获得 运行 的模拟。复制数据(如果 pickled,data
是 40MB)的过程只有几秒钟。但是,如果模拟次数增加,内存使用量似乎会增长得非常大。我想这个共享数据正在为每个任务复制,而不仅仅是为每个引擎复制。从客户端与引擎共享静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下它只需要每个主机复制一次(我在 host1 上有 4 个引擎,在 host2 上有 8 个引擎)。
这是我的代码:
from ipyparallel import Client
import pandas as pd
rc = Client()
view = rc[:] # use all engines
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task
def do_simulation(tweaks):
""" Run simulation with specified tweaks """
# Do sim stuff using the global data DataFrame
return results, id, tweaks
if __name__ == '__main__':
data = pd.read_sql("SELECT * FROM my_table", engine)
threads = [] # store list of tweaks dicts
for i in range(4):
for j in range(5):
for k in range(6):
threads.append(dict(i=i, j=j, k=k)
# Set up globals for each engine. This is the read-only DataFrame
view['data'] = data
ar = view.map_async(do_simulation, threads)
# Our async results should pop up over time. Let's measure our progress:
for idx, (results, id, tweaks) in enumerate(ar):
print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
# Store results as a pickle for the future
pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
# Save our results to a pickle file
pd.to_pickle(results, out_file_path + pfile)
print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)
如果模拟计数较小 (~50),则需要一段时间才能开始,但我开始看到进度打印语句。奇怪的是,多个任务将分配给同一个引擎,并且在为该引擎完成所有这些分配的任务之前我看不到响应。每次完成单个模拟任务时,我都希望看到 enumerate(ar)
的响应。
如果模拟计数很大(~1000),开始需要很长时间,我看到所有引擎上的 CPU 加速,但直到很长时间(~40 分钟)才看到进度打印语句,当我 do 看到进度时,似乎有一大块(>100)任务转到同一个引擎,并等待该引擎完成,然后再提供一些进度。当那个引擎完成时,我看到 ar
对象每 4 秒提供一次新响应——这可能是写入输出 pickle 文件的时间延迟。
最后,host1 还 运行s 了 ipycontroller 任务,它的内存使用量疯狂上升(Python 任务显示使用 >6GB RAM,内核任务显示使用 3GB)。 host2 引擎并没有真正显示出太多的内存使用情况。什么会导致内存激增?
几年前我在代码中使用过这个逻辑,后来我开始使用 this。我的代码是这样的:
shared_dict = {
# big dict with ~10k keys, each with a list of dicts
}
balancer = engines.load_balanced_view()
with engines[:].sync_imports(): # your 'view' variable
import pandas as pd
import ujson as json
engines[:].push(shared_dict)
results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()
If simulation counts are small (~50), then it takes a while to get
started, but i start to see progress print statements. Strangely,
multiple tasks will get assigned to the same engine and I don't see a
response until all of those assigned tasks are completed for that
engine. I would expect to see a response from enumerate(ar) every time
a single simulation task completes.
在我的例子中,my_func()
是一个复杂的方法,我将大量日志消息写入一个文件,所以我有打印语句。
关于任务分配,因为我用过load_balanced_view()
,我去图书馆摸索了,效果很好。
If simulation counts are large (~1000), it takes a long time to get
started, i see the CPUs throttle up on all engines, but no progress
print statements are seen until a long time (~40mins), and when I do
see progress, it appears a large block (>100) of tasks went to same
engine, and awaited completion from that one engine before providing
some progress. When that one engine did complete, i saw the ar object
provided new responses ever 4 secs - this may have been the time delay
to write the output pickle files.
大概这么久了,没体验过,所以也不好说什么。
我希望这对您的问题有所帮助。
PS:正如我在评论中所说,您可以尝试multiprocessing.Pool. I guess I haven't tried to share a big, read-only data as a global variable using it. I would give a try, because it seems to work。
有时您需要按类别分散数据分组,以确保每个子组都完全包含在一个集群中。
我通常是这样做的:
# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview = client.load_balanced_view()
lview.block = True
CORES = len(client[:])
# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
sz = df.groupby([grouper]).size().sort_values().index.unique()
for core in range(CORES):
ids = sz[core::CORES]
print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
client[core].push({name:df[df[grouper].isin(ids)]})
# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')
请注意,我建议的散点函数可确保每个集群都将承载相似数量的观察结果,这通常是个好主意。
我运行在不同参数的循环中进行相同的模拟。每个模拟都使用一个 pandas DataFrame (data
),它只被读取,从不被修改。使用 ipyparallel
(IPython 并行),我可以在模拟开始之前将此 DataFrames 放入我视图中每个引擎的全局变量 space 中:
view['data'] = data
然后引擎可以访问 DataFrame 以获取所有在其上获得 运行 的模拟。复制数据(如果 pickled,data
是 40MB)的过程只有几秒钟。但是,如果模拟次数增加,内存使用量似乎会增长得非常大。我想这个共享数据正在为每个任务复制,而不仅仅是为每个引擎复制。从客户端与引擎共享静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下它只需要每个主机复制一次(我在 host1 上有 4 个引擎,在 host2 上有 8 个引擎)。
这是我的代码:
from ipyparallel import Client
import pandas as pd
rc = Client()
view = rc[:] # use all engines
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task
def do_simulation(tweaks):
""" Run simulation with specified tweaks """
# Do sim stuff using the global data DataFrame
return results, id, tweaks
if __name__ == '__main__':
data = pd.read_sql("SELECT * FROM my_table", engine)
threads = [] # store list of tweaks dicts
for i in range(4):
for j in range(5):
for k in range(6):
threads.append(dict(i=i, j=j, k=k)
# Set up globals for each engine. This is the read-only DataFrame
view['data'] = data
ar = view.map_async(do_simulation, threads)
# Our async results should pop up over time. Let's measure our progress:
for idx, (results, id, tweaks) in enumerate(ar):
print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
# Store results as a pickle for the future
pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
# Save our results to a pickle file
pd.to_pickle(results, out_file_path + pfile)
print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)
如果模拟计数较小 (~50),则需要一段时间才能开始,但我开始看到进度打印语句。奇怪的是,多个任务将分配给同一个引擎,并且在为该引擎完成所有这些分配的任务之前我看不到响应。每次完成单个模拟任务时,我都希望看到 enumerate(ar)
的响应。
如果模拟计数很大(~1000),开始需要很长时间,我看到所有引擎上的 CPU 加速,但直到很长时间(~40 分钟)才看到进度打印语句,当我 do 看到进度时,似乎有一大块(>100)任务转到同一个引擎,并等待该引擎完成,然后再提供一些进度。当那个引擎完成时,我看到 ar
对象每 4 秒提供一次新响应——这可能是写入输出 pickle 文件的时间延迟。
最后,host1 还 运行s 了 ipycontroller 任务,它的内存使用量疯狂上升(Python 任务显示使用 >6GB RAM,内核任务显示使用 3GB)。 host2 引擎并没有真正显示出太多的内存使用情况。什么会导致内存激增?
几年前我在代码中使用过这个逻辑,后来我开始使用 this。我的代码是这样的:
shared_dict = {
# big dict with ~10k keys, each with a list of dicts
}
balancer = engines.load_balanced_view()
with engines[:].sync_imports(): # your 'view' variable
import pandas as pd
import ujson as json
engines[:].push(shared_dict)
results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()
If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.
在我的例子中,my_func()
是一个复杂的方法,我将大量日志消息写入一个文件,所以我有打印语句。
关于任务分配,因为我用过load_balanced_view()
,我去图书馆摸索了,效果很好。
If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.
大概这么久了,没体验过,所以也不好说什么。
我希望这对您的问题有所帮助。
PS:正如我在评论中所说,您可以尝试multiprocessing.Pool. I guess I haven't tried to share a big, read-only data as a global variable using it. I would give a try, because it seems to work。
有时您需要按类别分散数据分组,以确保每个子组都完全包含在一个集群中。
我通常是这样做的:
# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview = client.load_balanced_view()
lview.block = True
CORES = len(client[:])
# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
sz = df.groupby([grouper]).size().sort_values().index.unique()
for core in range(CORES):
ids = sz[core::CORES]
print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
client[core].push({name:df[df[grouper].isin(ids)]})
# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')
请注意,我建议的散点函数可确保每个集群都将承载相似数量的观察结果,这通常是个好主意。