使用按聚合分区的 Window 函数将 Spark SQL 转换为 Scala

Convert Spark SQL to Scala using Window function partitioned by aggregate

我有以下 Spark SQL 查询:

val subquery = 
        "( select garment_group_name , prod_name, " +
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
        "from articles a1 " +
        "group by garment_group_name, prod_name )"


val query = "SELECT garment_group_name, prod_name " +
            "FROM " + subquery +
            " WHERE seqnum = 1 "


val query3 = spark.sql(query)

我正在尝试做与数据框完全相同的事情 API。我只想先专注于子查询部分,然后我做了这样的事情

import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number

val windowSpec = Window.partitionBy("garment_group_name")

articlesDF.withColumn("row_number", row_number.over(windowSpec))
    .show()

但是我得到以下错误

org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply.applyOrElse(Analyzer.scala:2207)......... and so on.

我看到我需要包含一个 orderBy 子句,但是如果我实际上是先从一个分组依据两列开始计数,然后按顺序排列,我该怎么做呢?

警告给出了示例:SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table,但我不知道如何将其用作数据框API,而且我在网上也没有看到。

解决方案是先在Window中执行count("prod_name"),然后在windowSpec中使用"garment_group_name""prod_name"

从一些示例数据开始:

val df = List(
  ("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")

df.show(false)

给出:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|a                 |aa2      |
|a                 |aa3      |
|b                 |bb       |
+------------------+---------+

和我们需要的两个window函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec      = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)

然后我们可以使用它们:

df
    // create the `count` column to be used by `windowSpec`
    .withColumn("count", count(col("prod_name")).over(countWindowSpec))
    .withColumn("seqnum", row_number.over(windowSpec))
    // take only the first row of each partition
    .filter(col("seqnum") === 1)
    // select only the rows we care about
    .select("garment_group_name", "prod_name")
    .show(false)

给出:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|b                 |bb       |
+------------------+---------+

将此与您的 SQL 实施进行比较,使用相同的 df:

df.createOrReplaceTempView("a1")

val subquery = 
        "( select garment_group_name , prod_name, " +
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
        "from a1 " +
        "group by garment_group_name, prod_name )"

val query = "SELECT garment_group_name, prod_name " +
            "FROM " + subquery +
            " WHERE seqnum = 1 "

spark.sql(query).show(false)

我们得到相同的结果:

+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a                 |aa1      |
|b                 |bb       |
+------------------+---------+