一次性对 spark 数据帧执行多个聚合,而不是多次慢速连接

Perform multiple aggregations on a spark dataframe in one pass instead of multiple slow joins

目前我有 9 个函数可以对数据框进行特定计算 - 包括每月平均余额、滚动损益、期初余额、比率计算。

这些函数中的每一个都会产生以下结果: 第一列是函数接受的按列分组,最后一列是统计计算。

这些函数中的每一个都会生成一个 spark 数据框,该数据框具有相同的变量分组(相同的第一列 - 如果变量分组仅为 1,则为 1 列;如果变量分组为 2,则为 2 列,等等)和 1 列,其中值是特定计算 - 我在开头列出的示例。

因为这些函数中的每一个都进行不同的计算,所以我需要为每个函数生成一个数据框,然后将它们连接起来生成一个报告

我按变量将它们加入组中,因为它们在所有变量(每个单独的统计报告)中都很常见。

但是进行 7-8 甚至更多的连接非常慢。

有没有办法在不使用连接的情况下将这些列加在一起?

谢谢。

我可以想到多种方法。但这看起来像是新 pandas udf spark api.

的一个很好的用例

您可以定义一个group_by udf。 udf 将接收聚合组作为 pandas 数据帧。您在组上应用 9 个聚合函数,return 一个 pandas 数据框具有额外的 9 个聚合列。 Spark 会将每个新的 returned pandas 数据帧组合成一个大的 spark 数据帧。

例如

# given you want to aggregate average and ratio
@pandas_udf("month long, avg double, ratio dbl", PandasUDFType.GROUPED_MAP)
def compute(pdf):
    # pdf is a pandas.DataFrame
    pdf['avg'] = compute_avg(pdf)
    pdf['ratio'] = compute_ratio(pdf)
    return pdf

df.groupby("month").apply(compute).show()

Pandas-UDF#Grouped-Map

如果您的集群使用的是较低版本,您有 2 个选择:

  1. 坚持使用数据框 api 并编写自定义聚合函数。 。他们有一个可怕的 api 但用法看起来像这样。
df.groupBy(df.month).agg(
  my_avg_func(df.amount).alias('avg'),
  my_ratio_func(df.amount).alias('ratio'),
  1. 回退到好的 ol'rdd map reduce api
 #pseudocode
 def _create_tuple_key(record):
    return (record.month, record)
 def _compute_stats(acc, record):
    acc['count'] += 1
    acc['avg'] =  _accumulate_avg(acc['count'], record)
    acc['ratio'] =  _accumulate_ratio(acc['count'], record)
    return acc
 df.toRdd.map(__create_tuple_key).reduceByKey(_compute_stats)