如何将 pandas 数据框传递给分布式工作人员?

How to pass a pandas dataframe to dask distributed workers?

我正在尝试将一个大的 pandas 数据帧作为函数参数传递给 dask 分布式工作人员。我尝试了什么(X 是我的数据框):

1 将数据直接传递给函数:

def test(X):
    return X
f=client.submit(test, X)
f.result()

2 在初始化函数中保存数据帧。

def worker_init(r_X):
    global X
    X=r_X
client.run(worker_init,X,y)

3 将数据帧分散到所有节点,然后通过 futures 使用它

def test(X):
    return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()

None 的变体适用于我的情况。变体 1 和 2 的工作原理几乎相同。 dask-scheduler 会增加每个任务的内存,并且在内存不足且任务失败之前永远不会释放它。

变体 3 不起作用,因为我没有传递 pandas 数据帧,而是得到了一些垃圾。

如何将数据帧发送给工作人员并且调度程序上没有 MemoryError?

应该是内存高效的变体 3 的完整代码,但甚至没有传递数据帧:

import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
    return X
f = client.submit(test,f_X)
f.result()[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

client.scatter 检查输入列表,因此当您传入数据帧时,您不小心将其解压缩到一系列列表中。你应该做 f_X = client.scatter([X], broadcast=True)

现在每个工人都有一个数据框。这里 f_X 也是一个列表,包含一个未来,所以你会想要 f = client.submit(test,f_X[0]).

一般来说,如果您可以 generate/load 您的数据在 worker 的函数中,而不是从您的客户端传递它们,那会更好,这显然需要将整个东西放入本地内存,复制该数据,以及整个过程中的序列化成本。