df.groupby(...).apply(...).reset_index() 在 dask 数据框中

df.groupby(...).apply(...).reset_index() in dask dataframe

我想使用两个 Dask DataFrame 来处理大型 csv 文件,我需要在加入之前对一个 DataFrame 执行 groupby(...).apply(...).reset_index()它与另一个:

import pandas as pd
import dask.dataframe as dd

dfA = pd.DataFrame({'x': ["x1", "x2", "x2", "x1", "x3", "x2"],
                   'y': ["A", "B", "C", "B", "D", "E"]})
ddfA = dd.from_pandas(dfA, npartitions=2)

gA = ddfA.groupby('x').y.apply(list, meta=('y', 'str')).reset_index()

dfB = pd.DataFrame({'x': ["x1", "x2", "x3"],
                   'z': ["U", "V", "W"]})
ddfB = dd.from_pandas(dfB, npartitions=2)


gA.merge(ddfB, how='left', on='x')

不幸的是,我有一个 keyError:'x'。谁能帮我解决这个问题?

我不确定你想要的输出是什么,但如果你改变行的顺序你可以做到:

import pandas as pd
import dask.dataframe as dd

dfA = pd.DataFrame({'x': ["x1", "x2", "x2", "x1", "x3", "x2"],
                   'y': ["A", "B", "C", "B", "D", "E"]})

dfB = pd.DataFrame({'x': ["x1", "x2", "x3"],
                   'z': ["U", "V", "W"]})


gA = dfA.merge(dfB, how='left', on='x')
gA = dd.from_pandas(gA, npartitions=2)
gA


    x   y   z
npartitions=2           
0   object  object  object
3   ... ... ...
5   ... ... ...

看起来 agg(list) 有助于解决问题。

dfA = pd.DataFrame(
    {"x": ["x1", "x2", "x2", "x1", "x3", "x2"], "y": ["A", "B", "C", "B", "D", "E"]}
)
ddfA = dd.from_pandas(dfA, npartitions=2)

gA = ddfA.groupby("x").y.agg(list).reset_index()

dfB = pd.DataFrame({"x": ["x1", "x2", "x3"], "z": ["U", "V", "W"]})
ddfB = dd.from_pandas(dfB, npartitions=2)

print(gA.merge(ddfB, on="x", how="left").compute())

    x          y  z
0  x1     [A, B]  U
1  x2  [B, C, E]  V
2  x3        [D]  W

如果其中一个数据帧比另一个小,您可能需要研究广播连接,因为它的性能会更高。