Dask 中是否有类似共享内存的大型对象多处理作业?
Is there any something like shared-memory in Dask for large object multiprocessing job?
在回归测试中,我得到了一个 1000*100000 pandas 数据框,如下所示:
df=pd.DataFrame(np.random.random((1000,100)))
第一列是y标签,其他是x1-x99。我需要挑选出三到七个 var-x 来拟合 y ,运行 每次回归,获取所有输出并找到最佳选择。
我发现在 Ray 项目中通过调用 ray.put(object)
,大数组存储在共享内存中,所有工作进程都可以访问它而无需创建副本。
有太多的场合(161700+3921225+....),只读基本数据帧就可以了,因为这些工人之间没有交流,他们只需要return输出到主要的。
Dask 中是否有类似的东西来避免将数据复制到每个 worker 中?
它可能是这样的:
dask.put(df)
然后每个工人可能会像这样阅读他们自己的工作:
from itertools import combinations
rt=[]
for c in combinations(range(100),3):
(i,j,k)=c
rt.append(model(df.iloc[:,0],df.iloc[:,[i,j,k]]).fit())
rt=dask.compute(*rt)
这样可以避免在 main 中创建每个 y,X 副本并将每个 y,X 发送给所有工作人员?
Ray 在后台使用 PyArrow Plasma 在单个机器的上下文中存储共享数据。
虽然 Dask 没有明确支持 Plasma,但您可以很容易地使用它来存储和读取辅助函数中的共享数据。如果 worker 函数知道存储数据的 Plasma ObjectId
,您可以从 Plasma 检索数据。
Plasma 示例代码 here.
在回归测试中,我得到了一个 1000*100000 pandas 数据框,如下所示:
df=pd.DataFrame(np.random.random((1000,100)))
第一列是y标签,其他是x1-x99。我需要挑选出三到七个 var-x 来拟合 y ,运行 每次回归,获取所有输出并找到最佳选择。
我发现在 Ray 项目中通过调用 ray.put(object)
,大数组存储在共享内存中,所有工作进程都可以访问它而无需创建副本。
有太多的场合(161700+3921225+....),只读基本数据帧就可以了,因为这些工人之间没有交流,他们只需要return输出到主要的。
Dask 中是否有类似的东西来避免将数据复制到每个 worker 中? 它可能是这样的:
dask.put(df)
然后每个工人可能会像这样阅读他们自己的工作:
from itertools import combinations
rt=[]
for c in combinations(range(100),3):
(i,j,k)=c
rt.append(model(df.iloc[:,0],df.iloc[:,[i,j,k]]).fit())
rt=dask.compute(*rt)
这样可以避免在 main 中创建每个 y,X 副本并将每个 y,X 发送给所有工作人员?
Ray 在后台使用 PyArrow Plasma 在单个机器的上下文中存储共享数据。
虽然 Dask 没有明确支持 Plasma,但您可以很容易地使用它来存储和读取辅助函数中的共享数据。如果 worker 函数知道存储数据的 Plasma ObjectId
,您可以从 Plasma 检索数据。
Plasma 示例代码 here.