`ValueError: cannot reindex from a duplicate axis` using Dask DataFrame
`ValueError: cannot reindex from a duplicate axis` using Dask DataFrame
我一直在尝试调整我的代码以利用 Dask 来利用多台机器进行处理。虽然初始数据加载并不耗时,但后续处理在 8 核 i5 上大约需要 12 小时。这并不理想,并且认为使用 Dask 来帮助跨机器分散处理将是有益的。以下代码适用于标准 Pandas 方法:
import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype("str")
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip()
)
转换为 Dask 似乎很简单,但我在这个过程中遇到了一些问题。以下 Dask 改编代码会引发 ValueError: cannot reindex from a duplicate axis
错误:
import dask.dataframe as dd
from dask.distributed import Client
artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip().compute()
)
if __name__ == '__main__':
client = Client()
我能辨别出的最好情况是 Dask 不允许重新分配给现有的 Dask DataFrame。所以这有效:
...
artists_new = artists["name"].astype("str").compute()
...
但是,我真的不想每次都创建一个新的DataFrame。我宁愿用新的 DataFrame 替换现有的 DataFrame,主要是因为我在处理之前有多个数据清理步骤。
虽然教程和指南很有用,但它们非常基础,不涵盖此类用例。
这里使用 Dask DataFrames 的首选方法是什么?
每次你在 Dask dataframe/series 上调用 .compute()
时,它都会将其转换为 pandas
。那么这一行发生了什么
artists["name"] = artists["name"].astype(str).compute()
是您正在计算字符串列,然后将 pandas
系列分配给 dask 系列(不确保分区对齐)。解决方案是仅在最终结果上调用 .compute()
,而中间步骤可以使用常规 pandas
语法:
# modified example (.compute is removed)
artists["name"] = artists["name"].astype(str).str.lower()
我一直在尝试调整我的代码以利用 Dask 来利用多台机器进行处理。虽然初始数据加载并不耗时,但后续处理在 8 核 i5 上大约需要 12 小时。这并不理想,并且认为使用 Dask 来帮助跨机器分散处理将是有益的。以下代码适用于标准 Pandas 方法:
import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype("str")
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip()
)
转换为 Dask 似乎很简单,但我在这个过程中遇到了一些问题。以下 Dask 改编代码会引发 ValueError: cannot reindex from a duplicate axis
错误:
import dask.dataframe as dd
from dask.distributed import Client
artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip().compute()
)
if __name__ == '__main__':
client = Client()
我能辨别出的最好情况是 Dask 不允许重新分配给现有的 Dask DataFrame。所以这有效:
...
artists_new = artists["name"].astype("str").compute()
...
但是,我真的不想每次都创建一个新的DataFrame。我宁愿用新的 DataFrame 替换现有的 DataFrame,主要是因为我在处理之前有多个数据清理步骤。
虽然教程和指南很有用,但它们非常基础,不涵盖此类用例。
这里使用 Dask DataFrames 的首选方法是什么?
每次你在 Dask dataframe/series 上调用 .compute()
时,它都会将其转换为 pandas
。那么这一行发生了什么
artists["name"] = artists["name"].astype(str).compute()
是您正在计算字符串列,然后将 pandas
系列分配给 dask 系列(不确保分区对齐)。解决方案是仅在最终结果上调用 .compute()
,而中间步骤可以使用常规 pandas
语法:
# modified example (.compute is removed)
artists["name"] = artists["name"].astype(str).str.lower()