使用 Dask DataFrames 对组执行任意操作的最佳方法

Best way to perform arbitrary operations on groups with Dask DataFrames

我想使用 Dask 进行表单的操作

df.groupby(some_columns).apply(some_function)

其中 some_function() 可能会计算一些汇总统计数据、执行时间序列预测,甚至只是将组保存到 AWS S3 中的单个文件中。

Dask documentation 状态(以及其他几个 Whosebug 答案引用)groupby-apply 不适合聚合:

Pandas’ groupby-apply can be used to to apply arbitrary functions, including aggregations that result in one row per group. Dask’s groupby-apply will apply func once to each partition-group pair, so when func is a reduction you’ll end up with one row per partition-group pair. To apply a custom aggregation with Dask, use dask.dataframe.groupby.Aggregation.

不清楚Aggregation是否支持多列操作。然而,this DataFrames tutorial 似乎完全符合我的建议,大致 some_function = lambda x: LinearRegression().fit(...)。该示例似乎按预期工作,到目前为止,我同样没有遇到任何问题,例如some_function = lambda x: x.to_csv(...).

在什么情况下我可以预期 some_function 将传递组的所有行?如果永远无法保证,是否有办法打破 LinearRegression 示例?最重要的是,处理这些用例的最佳方式是什么?

这是推测性的,但解决 partition-group 导致单个组中的行跨分区拆分的情况的一种方法可能是明确地重新分区数据,以确保每个组都是与唯一分区关联。

实现这一点的一种方法是创建一个与组标识符列相同的索引。这通常不是 cheap operation,但可以通过以组标识符已排序的方式预处理数据来提供帮助。

当前版本的文档和源代码似乎不同步。具体来说,在 dask.groupby 的源代码中,有这样一条消息:

Dask groupby supports reductions, i.e., mean, sum and alike, and apply. The former do not shuffle the data and are efficiently implemented as tree reductions. The latter is implemented by shuffling the underlying partiitons such that all items of a group can be found in the same parititon.

这与文档中关于 partition-group 的警告不一致。下面的代码片段和任务图可视化还表明存在数据混洗以确保分区包含同一组的所有成员:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({'group': [0,0,1,1,2,0,1,2], 'npeople': [1,2,3,4,5,6,7,8]})
ddf = dd.from_pandas(df, npartitions=2)

def myfunc(df):
    return df['npeople'].sum()

results_pandas = df.groupby('group').apply(myfunc).sort_index()
results_dask = ddf.groupby('group').apply(myfunc).compute().sort_index()

print(sum(results_dask != results_pandas))
# will print 0, so there are no differences
# between dask and pandas groupby