使用按聚合分区的 Window 函数将 Spark SQL 转换为 Scala
Convert Spark SQL to Scala using Window function partitioned by aggregate
我有以下 Spark SQL 查询:
val subquery =
"( select garment_group_name , prod_name, " +
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
"from articles a1 " +
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name " +
"FROM " + subquery +
" WHERE seqnum = 1 "
val query3 = spark.sql(query)
我正在尝试做与数据框完全相同的事情 API。我只想先专注于子查询部分,然后我做了这样的事情
import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number
val windowSpec = Window.partitionBy("garment_group_name")
articlesDF.withColumn("row_number", row_number.over(windowSpec))
.show()
但是我得到以下错误
org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply.applyOrElse(Analyzer.scala:2207)......... and so on.
我看到我需要包含一个 orderBy
子句,但是如果我实际上是先从一个分组依据两列开始计数,然后按顺序排列,我该怎么做呢?
警告给出了示例:SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table
,但我不知道如何将其用作数据框API,而且我在网上也没有看到。
解决方案是先在Window中执行count("prod_name")
,然后在windowSpec
中使用"garment_group_name"
和"prod_name"
。
从一些示例数据开始:
val df = List(
("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")
df.show(false)
给出:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|a |aa2 |
|a |aa3 |
|b |bb |
+------------------+---------+
和我们需要的两个window函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)
然后我们可以使用它们:
df
// create the `count` column to be used by `windowSpec`
.withColumn("count", count(col("prod_name")).over(countWindowSpec))
.withColumn("seqnum", row_number.over(windowSpec))
// take only the first row of each partition
.filter(col("seqnum") === 1)
// select only the rows we care about
.select("garment_group_name", "prod_name")
.show(false)
给出:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|b |bb |
+------------------+---------+
将此与您的 SQL 实施进行比较,使用相同的 df
:
df.createOrReplaceTempView("a1")
val subquery =
"( select garment_group_name , prod_name, " +
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
"from a1 " +
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name " +
"FROM " + subquery +
" WHERE seqnum = 1 "
spark.sql(query).show(false)
我们得到相同的结果:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|b |bb |
+------------------+---------+
我有以下 Spark SQL 查询:
val subquery =
"( select garment_group_name , prod_name, " +
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
"from articles a1 " +
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name " +
"FROM " + subquery +
" WHERE seqnum = 1 "
val query3 = spark.sql(query)
我正在尝试做与数据框完全相同的事情 API。我只想先专注于子查询部分,然后我做了这样的事情
import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number
val windowSpec = Window.partitionBy("garment_group_name")
articlesDF.withColumn("row_number", row_number.over(windowSpec))
.show()
但是我得到以下错误
org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply.applyOrElse(Analyzer.scala:2207)......... and so on.
我看到我需要包含一个 orderBy
子句,但是如果我实际上是先从一个分组依据两列开始计数,然后按顺序排列,我该怎么做呢?
警告给出了示例:SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table
,但我不知道如何将其用作数据框API,而且我在网上也没有看到。
解决方案是先在Window中执行count("prod_name")
,然后在windowSpec
中使用"garment_group_name"
和"prod_name"
。
从一些示例数据开始:
val df = List(
("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")
df.show(false)
给出:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|a |aa2 |
|a |aa3 |
|b |bb |
+------------------+---------+
和我们需要的两个window函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)
然后我们可以使用它们:
df
// create the `count` column to be used by `windowSpec`
.withColumn("count", count(col("prod_name")).over(countWindowSpec))
.withColumn("seqnum", row_number.over(windowSpec))
// take only the first row of each partition
.filter(col("seqnum") === 1)
// select only the rows we care about
.select("garment_group_name", "prod_name")
.show(false)
给出:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|b |bb |
+------------------+---------+
将此与您的 SQL 实施进行比较,使用相同的 df
:
df.createOrReplaceTempView("a1")
val subquery =
"( select garment_group_name , prod_name, " +
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum " +
"from a1 " +
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name " +
"FROM " + subquery +
" WHERE seqnum = 1 "
spark.sql(query).show(false)
我们得到相同的结果:
+------------------+---------+
|garment_group_name|prod_name|
+------------------+---------+
|a |aa1 |
|b |bb |
+------------------+---------+