为什么 Window 函数会因 "Window function X does not take a frame specification" 而失败?
Why do Window functions fail with "Window function X does not take a frame specification"?
我正在尝试在 pyspark 1.4.1
中使用 Spark 1.4 window functions
但主要是出现错误或意外结果。
这是一个我认为应该可行的非常简单的示例:
from pyspark.sql.window import Window
import pyspark.sql.functions as func
l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()
[Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]
如您所见,如果我添加 rowsBetween
框架规范,rank()
和 lag/lead()
window 函数都无法识别它:"Window function does not take a frame specification".
如果我至少省略 rowsBetween
帧规范 lag/lead()
不会抛出异常,但 return 出乎意料的(对我来说)结果:总是 None
。 rank()
仍然无法处理不同的异常。
任何人都可以帮助我正确设置 window 功能吗?
更新
好吧,这看起来像是一个 pyspark 错误。
我在纯 Spark (Scala, spark-shell) 中准备了相同的测试:
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])
即使 rowsBetween
不能在 Scala 中应用,rank()
和 lag()/lead()
都可以在省略 rowsBetween
时正常工作。
据我所知,有两个不同的问题。 Window Hive GenericUDAFRank
, GenericUDAFLag
and GenericUDAFLead
根本不支持框架定义,因此您看到的错误是预期的行为。
关于以下 PySpark 代码的问题
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
看起来和我的问题有关 and should be addressed by SPARK-9978。到目前为止,您可以通过将 window 定义更改为此来使其工作:
wSpec = Window.partitionBy().orderBy(df.a)
我正在尝试在 pyspark 1.4.1
中使用 Spark 1.4 window functions但主要是出现错误或意外结果。 这是一个我认为应该可行的非常简单的示例:
from pyspark.sql.window import Window
import pyspark.sql.functions as func
l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()
[Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]
如您所见,如果我添加 rowsBetween
框架规范,rank()
和 lag/lead()
window 函数都无法识别它:"Window function does not take a frame specification".
如果我至少省略 rowsBetween
帧规范 lag/lead()
不会抛出异常,但 return 出乎意料的(对我来说)结果:总是 None
。 rank()
仍然无法处理不同的异常。
任何人都可以帮助我正确设置 window 功能吗?
更新
好吧,这看起来像是一个 pyspark 错误。 我在纯 Spark (Scala, spark-shell) 中准备了相同的测试:
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])
即使 rowsBetween
不能在 Scala 中应用,rank()
和 lag()/lead()
都可以在省略 rowsBetween
时正常工作。
据我所知,有两个不同的问题。 Window Hive GenericUDAFRank
, GenericUDAFLag
and GenericUDAFLead
根本不支持框架定义,因此您看到的错误是预期的行为。
关于以下 PySpark 代码的问题
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
看起来和我的问题有关 and should be addressed by SPARK-9978。到目前为止,您可以通过将 window 定义更改为此来使其工作:
wSpec = Window.partitionBy().orderBy(df.a)