使用 `dask.merge()` 的 KeyError
KeyError using `dask.merge()`
所以我有两个 pandas 通过
创建的数据帧
df1 = pd.read_cvs("first1.csv")
df2 = pd.read_csv("second2.csv")
它们都有 column1
列。要仔细检查,
print(df1.columns)
print(df2.columns)
两个return列'column1'
。
因此,我想将这两个数据帧与 dask 合并,在本地使用 60 个线程(使用外部合并):
dd1 = dd.merge(df1, df2, on="column1", how="outer", suffixes=("","_repeat")).compute(num_workers=60)
失败并出现 KeyError,KeyError: 'column1'
Traceback (most recent call last):
File "INSTALLATIONPATH/python3.5/site-packages/pandas/indexes/base.py", line 2134, in get_loc
return self._engine.get_loc(key)
File "pandas/index.pyx", line 139, in pandas.index.IndexEngine.get_loc (pandas/index.c:4443)
File "pandas/index.pyx", line 161, in pandas.index.IndexEngine.get_loc (pandas/index.c:4289)
File "pandas/src/hashtable_class_helper.pxi", line 732, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13733)
File "pandas/src/hashtable_class_helper.pxi", line 740, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13687)
KeyError: 'column1'
我认为这是一个可并行化的任务,即 dd.merge(df1, df2, on='id')
这个有"dask-equivalent"操作吗?我还尝试重新索引 chr
上的 pandas 数据帧(即 df1 = df1.reset_index('chr')
),然后尝试加入索引
dd.merge(df1, df2, left_index=True, right_index=True)
那也没用,同样的错误。
根据你的错误,我会仔细检查你的初始数据框,以确保你在两个(没有额外的空格或任何东西)中都有 column1
作为实际的列,因为它应该工作正常(没有错误以下代码)
此外,在 pandas.DataFrame
或 Dask.dataframe
上调用合并是有区别的。
下面是一些示例数据:
df1 = pd.DataFrame(np.transpose([np.arange(1000),
np.arange(1000)]), columns=['column1','column1_1'])
df2 = pd.DataFrame(np.transpose([np.arange(1000),
np.arange(1000, 2000)]), columns=['column1','column1_2'])
他们的 dask
等价物:
ddf1 = dd.from_pandas(df1, npartitions=100)
ddf2 = dd.from_pandas(df2, npartitions=100)
使用pandas.DataFrame
:
In [1]: type(dd.merge(df1, df2, on="column1", how="outer"))
Out [1]: pandas.core.frame.DataFrame
所以这个returns一个pandas.DataFrame,所以你不能在上面调用compute()
。
使用dask.dataframe
:
In [2]: type(dd.merge(ddf1, ddf2, on="column1", how="outer"))
Out[2]: dask.dataframe.core.DataFrame
这里可以调用compute
:
In [3]: dd.merge(ddf1,ddf2, how='outer').compute(num_workers=60)
Out[3]:
column1 column1_1 column1_2
0 0 0 1000
1 400 400 1400
2 100 100 1100
3 500 500 1500
4 300 300 1300
旁注:根据您的数据和硬件的大小,您可能需要检查 pandas.join
是否会更快:
df1.set_index('column1').join(df2.set_index('column1'), how='outer').reset_index()
对每个 df 使用 (1 000 000, 2)
的大小,它比我硬件上的 dask 解决方案更快。
所以我有两个 pandas 通过
创建的数据帧df1 = pd.read_cvs("first1.csv")
df2 = pd.read_csv("second2.csv")
它们都有 column1
列。要仔细检查,
print(df1.columns)
print(df2.columns)
两个return列'column1'
。
因此,我想将这两个数据帧与 dask 合并,在本地使用 60 个线程(使用外部合并):
dd1 = dd.merge(df1, df2, on="column1", how="outer", suffixes=("","_repeat")).compute(num_workers=60)
失败并出现 KeyError,KeyError: 'column1'
Traceback (most recent call last):
File "INSTALLATIONPATH/python3.5/site-packages/pandas/indexes/base.py", line 2134, in get_loc
return self._engine.get_loc(key)
File "pandas/index.pyx", line 139, in pandas.index.IndexEngine.get_loc (pandas/index.c:4443)
File "pandas/index.pyx", line 161, in pandas.index.IndexEngine.get_loc (pandas/index.c:4289)
File "pandas/src/hashtable_class_helper.pxi", line 732, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13733)
File "pandas/src/hashtable_class_helper.pxi", line 740, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13687)
KeyError: 'column1'
我认为这是一个可并行化的任务,即 dd.merge(df1, df2, on='id')
这个有"dask-equivalent"操作吗?我还尝试重新索引 chr
上的 pandas 数据帧(即 df1 = df1.reset_index('chr')
),然后尝试加入索引
dd.merge(df1, df2, left_index=True, right_index=True)
那也没用,同样的错误。
根据你的错误,我会仔细检查你的初始数据框,以确保你在两个(没有额外的空格或任何东西)中都有 column1
作为实际的列,因为它应该工作正常(没有错误以下代码)
此外,在 pandas.DataFrame
或 Dask.dataframe
上调用合并是有区别的。
下面是一些示例数据:
df1 = pd.DataFrame(np.transpose([np.arange(1000),
np.arange(1000)]), columns=['column1','column1_1'])
df2 = pd.DataFrame(np.transpose([np.arange(1000),
np.arange(1000, 2000)]), columns=['column1','column1_2'])
他们的 dask
等价物:
ddf1 = dd.from_pandas(df1, npartitions=100)
ddf2 = dd.from_pandas(df2, npartitions=100)
使用pandas.DataFrame
:
In [1]: type(dd.merge(df1, df2, on="column1", how="outer"))
Out [1]: pandas.core.frame.DataFrame
所以这个returns一个pandas.DataFrame,所以你不能在上面调用compute()
。
使用dask.dataframe
:
In [2]: type(dd.merge(ddf1, ddf2, on="column1", how="outer"))
Out[2]: dask.dataframe.core.DataFrame
这里可以调用compute
:
In [3]: dd.merge(ddf1,ddf2, how='outer').compute(num_workers=60)
Out[3]:
column1 column1_1 column1_2
0 0 0 1000
1 400 400 1400
2 100 100 1100
3 500 500 1500
4 300 300 1300
旁注:根据您的数据和硬件的大小,您可能需要检查 pandas.join
是否会更快:
df1.set_index('column1').join(df2.set_index('column1'), how='outer').reset_index()
对每个 df 使用 (1 000 000, 2)
的大小,它比我硬件上的 dask 解决方案更快。