python dask 数据帧 - 将 groupby.apply 输出连接到单个数据帧
python dask dataframes - concatenate groupby.apply output to a single data frame
我正在使用 dask dataframe.groupby().apply()
并获得一个 dask 系列作为 return 值。
我将每个组都列为一个列表三元组,例如 (a,b,1),然后希望将所有三元组变成一个单一的 dask 数据框
我在映射函数的末尾使用这段代码 return 三胞胎作为 dask df
#assume here that trips is a generator for tripletes such as you would produce from itertools.product([l1,l2,l3])
trip = list(itertools.chain.from_iterable(trip))
df = pd.DataFrame.from_records(trip)
return dd.from_pandas(df,npartitions=1)
然后当我尝试使用类似于 pandas concat with dask concatenate
的东西时
假设apply函数的结果是变量result。
我正在尝试使用
将 dask.dataframe 导入为 dd
dd.concat(结果,轴=0
得到错误
raise TypeError("dfs must be a list of DataFrames/Series objects")
TypeError: dfs must be a list of DataFrames/Series objects
但是当我使用
检查结果类型时
print type(result)
我明白了
output: class 'dask.dataframe.core.Series'
将函数应用于一组 dask groupby 对象并将所有结果放入一个数据框中的正确方法是什么?
谢谢
编辑:---------------------------------------- ------------------
为了产生用例,假设这个假数据生成
import random
import pandas as pd
import dask.dataframe as dd
people = [[random.randint(1,3), random.randint(1,3), random.randint(1,3)] for i in range(1000)]
ddf = dd.from_pandas(pd.DataFrame.from_records(people, columns=["first name", "last name", "cars"]), npartitions=1)
现在我的任务是按名字和姓氏对人进行分组(例如,所有具有相同名字和姓氏的人)然后我需要获得一个新的 dask 数据框,其中将包含每组多少辆汽车有。
假设 apply 函数可以 return 一系列元组列表,例如 [(name,name,cars count),(name,name,cars count)] 或具有相同元组的数据框列 - 姓名、姓名、汽车数量。
是的,我知道可以用另一种方式解决特定用例,但请相信我,我的用例更复杂。但我不能分享数据,也不能生成任何类似的数据。所以让我们使用一个虚拟数据:-)
挑战在于将应用的所有结果连接到单个 dask 数据帧中(pandas 数据帧在这里将是一个问题,数据将不适合内存 - 因此通过 pandas数据框会出问题)
对我来说,如果 apply
的输出是 pandas DataFrame
,那么如果需要,最后转换为 dask DataFrame
:
def f(x):
trip = ((1,2,x) for x in range(3))
df = pd.DataFrame.from_records(trip)
return df
df1 = ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
#only for remove MultiIndex
df1 = df1.reset_index()
print (df1)
cars level_1 x y z
0 1 0 1 2 0
1 1 1 1 2 1
2 1 2 1 2 2
3 2 0 1 2 0
4 2 1 1 2 1
5 2 2 1 2 2
6 3 0 1 2 0
7 3 1 1 2 1
8 3 2 1 2 2
ddf1 = dd.from_pandas(df1,npartitions=1)
print (ddf1)
cars level_1 x y z
npartitions=1
0 int64 int64 int64 int64 int64
8 ... ... ... ... ...
Dask Name: from_pandas, 1 tasks
编辑:
L = []
def f(x):
trip = ((1,2,x) for x in range(3))
#append each
L.append(da.from_array(np.array(list(trip)), chunks=(1,3)))
ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
dar = da.concatenate(L, axis=0)
print (dar)
dask.array<concatenate, shape=(12, 3), dtype=int32, chunksize=(1, 3)>
供您编辑:
In [8]: ddf.groupby(['first name', 'last name']).cars.count().compute()
Out[8]:
first name last name
1 1 107
2 107
3 110
2 1 117
2 120
3 99
3 1 119
2 103
3 118
Name: cars, dtype: int64
我正在使用 dask dataframe.groupby().apply() 并获得一个 dask 系列作为 return 值。 我将每个组都列为一个列表三元组,例如 (a,b,1),然后希望将所有三元组变成一个单一的 dask 数据框
我在映射函数的末尾使用这段代码 return 三胞胎作为 dask df
#assume here that trips is a generator for tripletes such as you would produce from itertools.product([l1,l2,l3])
trip = list(itertools.chain.from_iterable(trip))
df = pd.DataFrame.from_records(trip)
return dd.from_pandas(df,npartitions=1)
然后当我尝试使用类似于 pandas concat with dask concatenate
的东西时假设apply函数的结果是变量result。 我正在尝试使用 将 dask.dataframe 导入为 dd dd.concat(结果,轴=0
得到错误
raise TypeError("dfs must be a list of DataFrames/Series objects") TypeError: dfs must be a list of DataFrames/Series objects
但是当我使用
检查结果类型时print type(result)
我明白了
output: class 'dask.dataframe.core.Series'
将函数应用于一组 dask groupby 对象并将所有结果放入一个数据框中的正确方法是什么?
谢谢
编辑:---------------------------------------- ------------------ 为了产生用例,假设这个假数据生成
import random
import pandas as pd
import dask.dataframe as dd
people = [[random.randint(1,3), random.randint(1,3), random.randint(1,3)] for i in range(1000)]
ddf = dd.from_pandas(pd.DataFrame.from_records(people, columns=["first name", "last name", "cars"]), npartitions=1)
现在我的任务是按名字和姓氏对人进行分组(例如,所有具有相同名字和姓氏的人)然后我需要获得一个新的 dask 数据框,其中将包含每组多少辆汽车有。
假设 apply 函数可以 return 一系列元组列表,例如 [(name,name,cars count),(name,name,cars count)] 或具有相同元组的数据框列 - 姓名、姓名、汽车数量。
是的,我知道可以用另一种方式解决特定用例,但请相信我,我的用例更复杂。但我不能分享数据,也不能生成任何类似的数据。所以让我们使用一个虚拟数据:-)
挑战在于将应用的所有结果连接到单个 dask 数据帧中(pandas 数据帧在这里将是一个问题,数据将不适合内存 - 因此通过 pandas数据框会出问题)
对我来说,如果 apply
的输出是 pandas DataFrame
,那么如果需要,最后转换为 dask DataFrame
:
def f(x):
trip = ((1,2,x) for x in range(3))
df = pd.DataFrame.from_records(trip)
return df
df1 = ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
#only for remove MultiIndex
df1 = df1.reset_index()
print (df1)
cars level_1 x y z
0 1 0 1 2 0
1 1 1 1 2 1
2 1 2 1 2 2
3 2 0 1 2 0
4 2 1 1 2 1
5 2 2 1 2 2
6 3 0 1 2 0
7 3 1 1 2 1
8 3 2 1 2 2
ddf1 = dd.from_pandas(df1,npartitions=1)
print (ddf1)
cars level_1 x y z
npartitions=1
0 int64 int64 int64 int64 int64
8 ... ... ... ... ...
Dask Name: from_pandas, 1 tasks
编辑:
L = []
def f(x):
trip = ((1,2,x) for x in range(3))
#append each
L.append(da.from_array(np.array(list(trip)), chunks=(1,3)))
ddf.groupby('cars').apply(f, meta={'x': 'i8', 'y': 'i8', 'z': 'i8'}).compute()
dar = da.concatenate(L, axis=0)
print (dar)
dask.array<concatenate, shape=(12, 3), dtype=int32, chunksize=(1, 3)>
供您编辑:
In [8]: ddf.groupby(['first name', 'last name']).cars.count().compute()
Out[8]:
first name last name
1 1 107
2 107
3 110
2 1 117
2 120
3 99
3 1 119
2 103
3 118
Name: cars, dtype: int64