Dask 中的 GroupBy /Map_partitions
GroupBy /Map_partitions in Dask
我有一个有 2438 个分区的 dask 数据框,每个分区是 1.1GB 总共 7B 行
我想对多列进行分组并聚合其中一列
agg = {'total_x':'sum'}
df_s = df_s.map_partitions(lambda dff: dff.groupby(['y', 'z', 'a', 'b','c']).agg(agg) , meta=pd.DataFrame({'y':'int','z':'int', 'a':'int', 'b':'int','c':'object' ,'total_x':'f64'}))
我收到错误 If using all scalar values, you must pass an index
我该如何解决?
我有 160 GB RAM 的 RAM 和 24 个工作人员,在那种环境下甚至可以进行计算吗?
如果不行,还有什么可行的方法?
正如@Michael Delgado 所建议的,meta
定义存在问题。这应该修复 meta:
的定义
import pandas as pd
dtypes = {
"y": "int",
"z": "int",
"a": "int",
"b": "int",
"c": "object",
"total_x": "f64",
}
meta = pd.DataFrame(columns=dtypes.keys())
然后,这个 meta
可以作为 kwarg 传递。请参阅下面的可重现示例:
import dask
import pandas as pd
dtypes = {"name": "str", "x": "f64"}
meta = pd.DataFrame(columns=dtypes.keys())
agg = {"x": "sum"}
ddf = dask.datasets.timeseries().map_partitions(
lambda df: df.groupby(["name"], as_index=False).agg(agg), meta=meta
)
ddf.head()
我有一个有 2438 个分区的 dask 数据框,每个分区是 1.1GB 总共 7B 行 我想对多列进行分组并聚合其中一列
agg = {'total_x':'sum'}
df_s = df_s.map_partitions(lambda dff: dff.groupby(['y', 'z', 'a', 'b','c']).agg(agg) , meta=pd.DataFrame({'y':'int','z':'int', 'a':'int', 'b':'int','c':'object' ,'total_x':'f64'}))
我收到错误 If using all scalar values, you must pass an index
我该如何解决? 我有 160 GB RAM 的 RAM 和 24 个工作人员,在那种环境下甚至可以进行计算吗?
如果不行,还有什么可行的方法?
正如@Michael Delgado 所建议的,meta
定义存在问题。这应该修复 meta:
import pandas as pd
dtypes = {
"y": "int",
"z": "int",
"a": "int",
"b": "int",
"c": "object",
"total_x": "f64",
}
meta = pd.DataFrame(columns=dtypes.keys())
然后,这个 meta
可以作为 kwarg 传递。请参阅下面的可重现示例:
import dask
import pandas as pd
dtypes = {"name": "str", "x": "f64"}
meta = pd.DataFrame(columns=dtypes.keys())
agg = {"x": "sum"}
ddf = dask.datasets.timeseries().map_partitions(
lambda df: df.groupby(["name"], as_index=False).agg(agg), meta=meta
)
ddf.head()