`ValueError: cannot reindex from a duplicate axis` using Dask DataFrame

`ValueError: cannot reindex from a duplicate axis` using Dask DataFrame

我一直在尝试调整我的代码以利用 Dask 来利用多台机器进行处理。虽然初始数据加载并不耗时,但后续处理在 8 核 i5 上大约需要 12 小时。这并不理想,并且认为使用 Dask 来帮助跨机器分散处理将是有益的。以下代码适用于标准 Pandas 方法:

import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype("str")

artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip()
)

转换为 Dask 似乎很简单,但我在这个过程中遇到了一些问题。以下 Dask 改编代码会引发 ValueError: cannot reindex from a duplicate axis 错误:

import dask.dataframe as dd
from dask.distributed import Client

artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip().compute()
)

if __name__ == '__main__':
    client = Client()

我能辨别出的最好情况是 Dask 不允许重新分配给现有的 Dask DataFrame。所以这有效:

...
artists_new = artists["name"].astype("str").compute()
...

但是,我真的不想每次都创建一个新的DataFrame。我宁愿用新的 DataFrame 替换现有的 DataFrame,主要是因为我在处理之前有多个数据清理步骤。

虽然教程和指南很有用,但它们非常基础,不涵盖此类用例。

这里使用 Dask DataFrames 的首选方法是什么?

每次你在 Dask dataframe/series 上调用 .compute() 时,它都会将其转换为 pandas。那么这一行发生了什么

artists["name"] = artists["name"].astype(str).compute()

是您正在计算字符串列,然后将 pandas 系列分配给 dask 系列(不确保分区对齐)。解决方案是仅在最终结果上调用 .compute(),而中间步骤可以使用常规 pandas 语法:

# modified example (.compute is removed)
artists["name"] = artists["name"].astype(str).str.lower()