使用导致 ArrowInvalid 的 dask 并行保存到同一个镶木地板文件
Saving to the same parquet file in parallel using dask leading to ArrowInvalid
我正在做一些模拟,在其中计算几个时间步长的一些东西。对于每个我想保存一个镶木地板文件,其中每一行对应一个模拟,看起来像这样:
def simulation():
nsim = 3
timesteps = [1,2]
data = {} #initialization not shown here
for i in nsim:
compute_stuff()
for j in timesteps:
data[str(j)]= compute_some_other_stuff()
return data
一旦我的字典 data
包含我的模拟结果(在 numpy 数组下),我将其转换为 dask.DataFrame
对象,然后使用 .to_parquet()
将它们保存到文件中方法如下:
def save(data):
for i in data.keys():
data[i] = pd.DataFrame(data[i], bins=...)
df = from_pandas(data[i], npartitions=2)
f.to_parquet(datafolder + i + "/", engine="pyarrow", append=True, ignore_divisions = True)
当仅使用此代码时,它可以完美运行,而当我尝试并行实现它时就会遇到困难。我使用 dask:
client = Client(n_workers=10, processes=True)
def f():
data = simulation()
save(data)
to_compute = [delayed(f)(n) for n in range(20)]
compute(to_compute)
这最后一部分代码的行为非常随机。在某些时候会发生这种情况:
distributed.worker - WARNING - Compute Failed
Function: f
args: (4)
kwargs: {}
Exception: "ArrowInvalid('Parquet file size is 0 bytes')"
....
distributed.worker - WARNING - Compute Failed
Function: f
args: (12)
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
我认为这些错误是由于 2 个进程试图同时写入同一个 parquet 文件,并且没有很好地处理(因为它可以在 txt 文件上)。我已经尝试切换到 pySpark / Koalas 但没有成功。我有更好的方法来保存模拟结果(以防集群崩溃/挂起时间)吗?
您正在犯一个经典的 dask 错误,即从本身延迟的函数中调用 dask API。该错误表明事情正在并行发生(这就是 dask 所做的!),预计在处理过程中不会发生变化。具体来说,一个文件显然正在被一个任务编辑,而另一个任务正在读取它(不确定是哪个)。
您可能想要做的是在数据框片段上使用 concat
,然后调用 to_parquet。
请注意,您的所有数据似乎实际上都保存在客户端中,并且您正在使用 from_parquet。这似乎是个坏主意,因为您错过了 dask 最大的功能之一,即仅在需要时加载数据。相反,您应该将数据加载到延迟函数或 dask 数据帧 API 调用中。
我正在做一些模拟,在其中计算几个时间步长的一些东西。对于每个我想保存一个镶木地板文件,其中每一行对应一个模拟,看起来像这样:
def simulation():
nsim = 3
timesteps = [1,2]
data = {} #initialization not shown here
for i in nsim:
compute_stuff()
for j in timesteps:
data[str(j)]= compute_some_other_stuff()
return data
一旦我的字典 data
包含我的模拟结果(在 numpy 数组下),我将其转换为 dask.DataFrame
对象,然后使用 .to_parquet()
将它们保存到文件中方法如下:
def save(data):
for i in data.keys():
data[i] = pd.DataFrame(data[i], bins=...)
df = from_pandas(data[i], npartitions=2)
f.to_parquet(datafolder + i + "/", engine="pyarrow", append=True, ignore_divisions = True)
当仅使用此代码时,它可以完美运行,而当我尝试并行实现它时就会遇到困难。我使用 dask:
client = Client(n_workers=10, processes=True)
def f():
data = simulation()
save(data)
to_compute = [delayed(f)(n) for n in range(20)]
compute(to_compute)
这最后一部分代码的行为非常随机。在某些时候会发生这种情况:
distributed.worker - WARNING - Compute Failed
Function: f
args: (4)
kwargs: {}
Exception: "ArrowInvalid('Parquet file size is 0 bytes')"
....
distributed.worker - WARNING - Compute Failed
Function: f
args: (12)
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
我认为这些错误是由于 2 个进程试图同时写入同一个 parquet 文件,并且没有很好地处理(因为它可以在 txt 文件上)。我已经尝试切换到 pySpark / Koalas 但没有成功。我有更好的方法来保存模拟结果(以防集群崩溃/挂起时间)吗?
您正在犯一个经典的 dask 错误,即从本身延迟的函数中调用 dask API。该错误表明事情正在并行发生(这就是 dask 所做的!),预计在处理过程中不会发生变化。具体来说,一个文件显然正在被一个任务编辑,而另一个任务正在读取它(不确定是哪个)。
您可能想要做的是在数据框片段上使用 concat
,然后调用 to_parquet。
请注意,您的所有数据似乎实际上都保存在客户端中,并且您正在使用 from_parquet。这似乎是个坏主意,因为您错过了 dask 最大的功能之一,即仅在需要时加载数据。相反,您应该将数据加载到延迟函数或 dask 数据帧 API 调用中。