Spark 3.0.1 是否支持 window 函数上的自定义聚合器?

Does Spark 3.0.1 support custom Aggregators on window functions?

我编写了自定义 Aggregatororg.apache.spark.sql.expressions.Aggregator 的扩展),Spark 在 group by 语句下将其作为聚合函数正确调用:

sparkSession
    .createDataFrame(...)
    .groupBy(col("id"))
    .agg(
        new MyCustomAggregator().toColumn().name("aggregation_result"))
    .show();

不过我想在 window 函数中使用它,因为顺序对我来说很重要。 我试过这样调用它:

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

这是我得到的错误:

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.

是否可以在 Spark 3.0.1 中使用自定义聚合器作为 window 函数? 如果是这样,我在这里缺少什么?

是的,Spark 3 确实支持自定义聚合器作为 window 函数。

这里是 Java 代码:

UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

AggregationInput 这是一个简单的 DTO,其中包含聚合函数所需的行元素。

因此,无论您是在 group by 下聚合还是作为 window 函数聚合,您仍然希望使用 org.apache.spark.sql.expressions.Aggregator