Spark 3.0.1 是否支持 window 函数上的自定义聚合器?
Does Spark 3.0.1 support custom Aggregators on window functions?
我编写了自定义 Aggregator
(org.apache.spark.sql.expressions.Aggregator
的扩展),Spark 在 group by
语句下将其作为聚合函数正确调用:
sparkSession
.createDataFrame(...)
.groupBy(col("id"))
.agg(
new MyCustomAggregator().toColumn().name("aggregation_result"))
.show();
不过我想在 window 函数中使用它,因为顺序对我来说很重要。
我试过这样调用它:
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
这是我得到的错误:
org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.
是否可以在 Spark 3.0.1 中使用自定义聚合器作为 window 函数?
如果是这样,我在这里缺少什么?
是的,Spark 3 确实支持自定义聚合器作为 window 函数。
这里是 Java 代码:
UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
AggregationInput
这是一个简单的 DTO,其中包含聚合函数所需的行元素。
因此,无论您是在 group by
下聚合还是作为 window 函数聚合,您仍然希望使用 org.apache.spark.sql.expressions.Aggregator
。
我编写了自定义 Aggregator
(org.apache.spark.sql.expressions.Aggregator
的扩展),Spark 在 group by
语句下将其作为聚合函数正确调用:
sparkSession
.createDataFrame(...)
.groupBy(col("id"))
.agg(
new MyCustomAggregator().toColumn().name("aggregation_result"))
.show();
不过我想在 window 函数中使用它,因为顺序对我来说很重要。 我试过这样调用它:
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
这是我得到的错误:
org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.
是否可以在 Spark 3.0.1 中使用自定义聚合器作为 window 函数? 如果是这样,我在这里缺少什么?
是的,Spark 3 确实支持自定义聚合器作为 window 函数。
这里是 Java 代码:
UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
AggregationInput
这是一个简单的 DTO,其中包含聚合函数所需的行元素。
因此,无论您是在 group by
下聚合还是作为 window 函数聚合,您仍然希望使用 org.apache.spark.sql.expressions.Aggregator
。