具有嵌套列的 Apache Spark Window 函数
Apache Spark Window function with nested column
我不确定这是一个错误(或语法不正确)。我四处搜索,没有看到其他地方提到过这个,所以我在提交错误报告之前先在这里问一下。
我正在尝试使用在嵌套列上分区的 Window 函数。我在下面创建了一个小示例来演示该问题。
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
.withColumn("Data", struct("A", "B", "C")).drop("A").drop("B").drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").where("num = max").drop("max").show
以上导致错误
org.apache.spark.sql.AnalysisException: resolved attribute(s) A#39,B#40 missing from num#33,Data#37 in operator !Project [num#33,Data#37,A#39,B#40];
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
...
如果这些列不是嵌套的,则可以正常工作。
我是不是遗漏了什么语法,或者这是一个错误?
在我看来,当分析器试图扩展 *
时,你遇到了一个错误
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
sql("SET spark.sql.eagerAnalysis=false") // Let us see the error even though we are constructing an invalid tree
val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
.withColumn("Data", struct("A", "B", "C"))
.drop("A")
.drop("B")
.drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").explain(true)
通过关闭 eager analysis(这样我们就可以调用 explain 而不会抛出错误),您可以看到“*”正在扩展以包含实际上不可用的列:
== Parsed Logical Plan ==
'Project [*,'max('num) windowspecdefinition('Data.A,'Data.B,'num DESC,UnspecifiedFrame) AS max#64928]
+- Project [num#64926,Data#64927]
+- Project [C#64925,num#64926,Data#64927]
+- Project [B#64924,C#64925,num#64926,Data#64927]
+- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
+- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
+- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]
== Analyzed Logical Plan ==
num: int, Data: struct<A:string,B:string,C:string>, max: int
Project [num#64926,Data#64927,max#64928]
+- Project [num#64926,Data#64927,A#64932,B#64933,max#64928,max#64928]
+- Window [num#64926,Data#64927,A#64932,B#64933], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax(num#64926) windowspecdefinition(A#64932,B#64933,num#64926 DESC,RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max#64928], [A#64932,B#64933], [num#64926 DESC]
+- !Project [num#64926,Data#64927,A#64932,B#64933]
+- Project [num#64926,Data#64927]
+- Project [C#64925,num#64926,Data#64927]
+- Project [B#64924,C#64925,num#64926,Data#64927]
+- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
+- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
+- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]
我已在此处提交:https://issues.apache.org/jira/browse/SPARK-12989。
如果您手动列出列而不是使用 *
应该作为解决方法。
我不确定这是一个错误(或语法不正确)。我四处搜索,没有看到其他地方提到过这个,所以我在提交错误报告之前先在这里问一下。
我正在尝试使用在嵌套列上分区的 Window 函数。我在下面创建了一个小示例来演示该问题。
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
.withColumn("Data", struct("A", "B", "C")).drop("A").drop("B").drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").where("num = max").drop("max").show
以上导致错误
org.apache.spark.sql.AnalysisException: resolved attribute(s) A#39,B#40 missing from num#33,Data#37 in operator !Project [num#33,Data#37,A#39,B#40];
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
...
如果这些列不是嵌套的,则可以正常工作。 我是不是遗漏了什么语法,或者这是一个错误?
在我看来,当分析器试图扩展 *
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
sql("SET spark.sql.eagerAnalysis=false") // Let us see the error even though we are constructing an invalid tree
val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
.withColumn("Data", struct("A", "B", "C"))
.drop("A")
.drop("B")
.drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").explain(true)
通过关闭 eager analysis(这样我们就可以调用 explain 而不会抛出错误),您可以看到“*”正在扩展以包含实际上不可用的列:
== Parsed Logical Plan ==
'Project [*,'max('num) windowspecdefinition('Data.A,'Data.B,'num DESC,UnspecifiedFrame) AS max#64928]
+- Project [num#64926,Data#64927]
+- Project [C#64925,num#64926,Data#64927]
+- Project [B#64924,C#64925,num#64926,Data#64927]
+- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
+- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
+- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]
== Analyzed Logical Plan ==
num: int, Data: struct<A:string,B:string,C:string>, max: int
Project [num#64926,Data#64927,max#64928]
+- Project [num#64926,Data#64927,A#64932,B#64933,max#64928,max#64928]
+- Window [num#64926,Data#64927,A#64932,B#64933], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax(num#64926) windowspecdefinition(A#64932,B#64933,num#64926 DESC,RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max#64928], [A#64932,B#64933], [num#64926 DESC]
+- !Project [num#64926,Data#64927,A#64932,B#64933]
+- Project [num#64926,Data#64927]
+- Project [C#64925,num#64926,Data#64927]
+- Project [B#64924,C#64925,num#64926,Data#64927]
+- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
+- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
+- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]
我已在此处提交:https://issues.apache.org/jira/browse/SPARK-12989。
如果您手动列出列而不是使用 *
应该作为解决方法。