使用导致 ArrowInvalid 的 dask 并行保存到同一个镶木地板文件

Saving to the same parquet file in parallel using dask leading to ArrowInvalid

我正在做一些模拟,在其中计算几个时间步长的一些东西。对于每个我想保存一个镶木地板文件,其中每一行对应一个模拟,看起来像这样:

def simulation():
    nsim = 3
    timesteps = [1,2]
    data = {} #initialization not shown here
    for i in nsim:
        compute_stuff()
        for j in timesteps:
            data[str(j)]= compute_some_other_stuff()

    return data

一旦我的字典 data 包含我的模拟结果(在 numpy 数组下),我将其转换为 dask.DataFrame 对象,然后使用 .to_parquet() 将它们保存到文件中方法如下:

def save(data):
    for i in data.keys():
        data[i] = pd.DataFrame(data[i], bins=...)
        df = from_pandas(data[i], npartitions=2)
        f.to_parquet(datafolder + i + "/", engine="pyarrow", append=True, ignore_divisions = True)

当仅使用此代码时,它可以完美运行,而当我尝试并行实现它时就会遇到困难。我使用 dask:

client = Client(n_workers=10, processes=True)

def f():
   data = simulation()
   save(data)

to_compute = [delayed(f)(n) for n in range(20)]
compute(to_compute)

这最后一部分代码的行为非常随机。在某些时候会发生这种情况:

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (4)
kwargs:    {}
Exception: "ArrowInvalid('Parquet file size is 0 bytes')"
....

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (12)
kwargs:    {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

我认为这些错误是由于 2 个进程试图同时写入同一个 parquet 文件,并且没有很好地处理(因为它可以在 txt 文件上)。我已经尝试切换到 pySpark / Koalas 但没有成功。我有更好的方法来保存模拟结果(以防集群崩溃/挂起时间)吗?

您正在犯一个经典的 dask 错误,即从本身延迟的函数中调用 dask API。该错误表明事情正在并行发生(这就是 dask 所做的!),预计在处理过程中不会发生变化。具体来说,一个文件显然正在被一个任务编辑,而另一个任务正在读取它(不确定是哪个)。

您可能想要做的是在数据框片段上使用 concat,然后调用 to_parquet。

请注意,您的所有数据似乎实际上都保存在客户端中,并且您正在使用 from_parquet。这似乎是个坏主意,因为您错过了 dask 最大的功能之一,即仅在需要时加载数据。相反,您应该将数据加载到延迟函数或 dask 数据帧 API 调用中。