python dask 数据帧 - 将 groupby.apply 输出连接到单个数据帧

python dask dataframes - concatenate groupby.apply output to a single data frame

我正在使用 dask dataframe.groupby().apply() 并获得一个 dask 系列作为 return 值。 我将每个组都列为一个列表三元组,例如 (a,b,1),然后希望将所有三元组变成一个单一的 dask 数据框

我在映射函数的末尾使用这段代码 return 三胞胎作为 dask df

#assume here that trips is a generator for tripletes such as you would produce from itertools.product([l1,l2,l3])
trip = list(itertools.chain.from_iterable(trip))
df = pd.DataFrame.from_records(trip)
return dd.from_pandas(df,npartitions=1)

然后当我尝试使用类似于 pandas concat with dask concatenate

的东西时

假设apply函数的结果是变量result。 我正在尝试使用 将 dask.dataframe 导入为 dd dd.concat(结果,轴=0

得到错误

raise TypeError("dfs must be a list of DataFrames/Series objects") TypeError: dfs must be a list of DataFrames/Series objects

但是当我使用

检查结果类型时
print type(result)

我明白了

output: class 'dask.dataframe.core.Series'

将函数应用于一组 dask groupby 对象并将所有结果放入一个数据框中的正确方法是什么?

谢谢

编辑:---------------------------------------- ------------------ 为了产生用例,假设这个假数据生成

import random
import pandas as pd
import dask.dataframe as dd
people = [[random.randint(1,3), random.randint(1,3), random.randint(1,3)] for i in range(1000)]
ddf = dd.from_pandas(pd.DataFrame.from_records(people, columns=["first name", "last name", "cars"]), npartitions=1)

现在我的任务是按名字和姓氏对人进行分组(例如,所有具有相同名字和姓氏的人)然后我需要获得一个新的 dask 数据框,其中将包含每组多少辆汽车有。

假设 apply 函数可以 return 一系列元组列表,例如 [(name,name,cars count),(name,name,cars count)] 或具有相同元组的数据框列 - 姓名、姓名、汽车数量。

是的,我知道可以用另一种方式解决特定用例,但请相信我,我的用例更复杂。但我不能分享数据,也不能生成任何类似的数据。所以让我们使用一个虚拟数据:-)

挑战在于将应用的所有结果连接到单个 dask 数据帧中(pandas 数据帧在这里将是一个问题,数据将不适合内存 - 因此通过 pandas数据框会出问题)

对我来说,如果 apply 的输出是 pandas DataFrame,那么如果需要,最后转换为 dask DataFrame:

def f(x):
    trip = ((1,2,x) for x in range(3))
    df = pd.DataFrame.from_records(trip)
    return df

df1 = ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
#only for remove MultiIndex
df1 = df1.reset_index()
print (df1)
   cars  level_1  x  y  z
0     1        0  1  2  0
1     1        1  1  2  1
2     1        2  1  2  2
3     2        0  1  2  0
4     2        1  1  2  1
5     2        2  1  2  2
6     3        0  1  2  0
7     3        1  1  2  1
8     3        2  1  2  2

ddf1 = dd.from_pandas(df1,npartitions=1)
print (ddf1)
                cars level_1      x      y      z
npartitions=1                                    
0              int64   int64  int64  int64  int64
8                ...     ...    ...    ...    ...
Dask Name: from_pandas, 1 tasks

编辑:

L = []
def f(x):
    trip = ((1,2,x) for x in range(3))
    #append each
    L.append(da.from_array(np.array(list(trip)), chunks=(1,3)))

ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
dar =  da.concatenate(L, axis=0)
print (dar)
dask.array<concatenate, shape=(12, 3), dtype=int32, chunksize=(1, 3)>

供您编辑:

In [8]: ddf.groupby(['first name', 'last name']).cars.count().compute()
Out[8]:
first name  last name
1           1            107
            2            107
            3            110
2           1            117
            2            120
            3             99
3           1            119
            2            103
            3            118
Name: cars, dtype: int64