Dask 中是否有类似共享内存的大型对象多处理作业?

Is there any something like shared-memory in Dask for large object multiprocessing job?

在回归测试中,我得到了一个 1000*100000 pandas 数据框,如下所示:

df=pd.DataFrame(np.random.random((1000,100)))

第一列是y标签,其他是x1-x99。我需要挑选出三到七个 var-x 来拟合 y ,运行 每次回归,获取所有输出并找到最佳选择。

我发现在 Ray 项目中通过调用 ray.put(object),大数组存储在共享内存中,所有工作进程都可以访问它而无需创建副本。

有太多的场合(161700+3921225+....),只读基本数据帧就可以了,因为这些工人之间没有交流,他们只需要return输出到主要的。

Dask 中是否有类似的东西来避免将数据复制到每个 worker 中? 它可能是这样的:

dask.put(df)

然后每个工人可能会像这样阅读他们自己的工作:

from itertools import combinations
rt=[]
for c in combinations(range(100),3):
    (i,j,k)=c
    rt.append(model(df.iloc[:,0],df.iloc[:,[i,j,k]]).fit())
rt=dask.compute(*rt)

这样可以避免在 main 中创建每个 y,X 副本并将每个 y,X 发送给所有工作人员?

Ray 在后台使用 PyArrow Plasma 在单个机器的上下文中存储共享数据。

虽然 Dask 没有明确支持 Plasma,但您可以很容易地使用它来存储和读取辅助函数中的共享数据。如果 worker 函数知道存储数据的 Plasma ObjectId,您可以从 Plasma 检索数据。

Plasma 示例代码 here.