AnalysisException:根据前几行计算列值时 window 函数不支持表达式
AnalysisException: Expression not supported within a window function while calculating column value based on previous rows
我有包含 4 个字段 amt1、amt2、amt3 和 amt4 的示例数据。我们要根据字段(amt1、amt2、amt3、amt4)的总和和上一行的 amt5 值计算 amt5 的值。
假设下面是数据集:
+----+----+----+----+---+
|amt1|amt2|amt3|amt4|ids|
+----+----+----+----+---+
| 1| 2| 3| 4| 1|
| 1| 2| 3| 4| 2|
| 1| 2| 3| 4| 3|
| 1| 2| 3| 4| 4|
| 1| 2| 3| 4| 5|
| 1| 2| 3| 4| 6|
+----+----+----+----+---+
下面是我期望的输出:
+----+----+----+----+---+----+
|amt1|amt2|amt3|amt4|ids|amt5|
+----+----+----+----+---+----+
| 1| 2| 3| 4| 1|10 |
| 1| 2| 3| 4| 2|20 |
| 1| 2| 3| 4| 3|30 |
| 1| 2| 3| 4| 4|40 |
+----+----+----+----+---+----+
以下是执行上述代码后出现的异常:
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as func
def sum(*col):
sum = 0
for i in col:
sum = sum + i
return sum
rdd = sc.parallelize(["1,1,2,3,4", "2,1,2,3,4", "3,1,2,3,4", "4,1,2,3,4", "5,1,2,3,4", "6,1,2,3,4"])
finalRdd = rdd.map(lambda t: t.split(",")).map(lambda t: Row(ids=t[0],amt1=t[1],amt2=t[2],amt3=t[3],amt4=t[4]))
df = spark.createDataFrame(finalRdd)
w = Window.orderBy("ids").rowsBetween(
Window.unboundedPreceding, # Take all rows from the beginning of frame
Window.currentRow) # To current row
df1 = df.withColumn("amt5",sum(df.amt1,df.amt2,df.amt3,df.amt4))
df1.withColumn("amt5",sum(df1.amt5).over(w)).show()
Below are the exception I am getting after execution of above code:
py4j.protocol.Py4JJavaError: An error occurred while calling o121.withColumn.
: org.apache.spark.sql.AnalysisException: Expression '(amt5#11 + cast(0 as double))' not supported within a window function.;;
Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19, total#19]
+- Window [(amt5#11 + cast(0 as double)) windowspecdefinition(ids#4 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total#19], [ids#4 ASC NULLS FIRST]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, ((((cast(amt1#0 as double) + cast(0 as double)) + cast(amt2#1 as double)) + cast(amt3#2 as double)) + cast(amt4#3 as double)) AS amt5#11]
+- LogicalRDD [amt1#0, amt2#1, amt3#2, amt4#3, ids#4]
您在 sum
函数中遇到碰撞。 window 函数应该来自 pyspark.sql.functions
包,所以你应该这样调用它:
df1.withColumn("amt5",func.sum(df1.amt5).over(w)).show()
我有包含 4 个字段 amt1、amt2、amt3 和 amt4 的示例数据。我们要根据字段(amt1、amt2、amt3、amt4)的总和和上一行的 amt5 值计算 amt5 的值。
假设下面是数据集:
+----+----+----+----+---+
|amt1|amt2|amt3|amt4|ids|
+----+----+----+----+---+
| 1| 2| 3| 4| 1|
| 1| 2| 3| 4| 2|
| 1| 2| 3| 4| 3|
| 1| 2| 3| 4| 4|
| 1| 2| 3| 4| 5|
| 1| 2| 3| 4| 6|
+----+----+----+----+---+
下面是我期望的输出:
+----+----+----+----+---+----+
|amt1|amt2|amt3|amt4|ids|amt5|
+----+----+----+----+---+----+
| 1| 2| 3| 4| 1|10 |
| 1| 2| 3| 4| 2|20 |
| 1| 2| 3| 4| 3|30 |
| 1| 2| 3| 4| 4|40 |
+----+----+----+----+---+----+
以下是执行上述代码后出现的异常:
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as func
def sum(*col):
sum = 0
for i in col:
sum = sum + i
return sum
rdd = sc.parallelize(["1,1,2,3,4", "2,1,2,3,4", "3,1,2,3,4", "4,1,2,3,4", "5,1,2,3,4", "6,1,2,3,4"])
finalRdd = rdd.map(lambda t: t.split(",")).map(lambda t: Row(ids=t[0],amt1=t[1],amt2=t[2],amt3=t[3],amt4=t[4]))
df = spark.createDataFrame(finalRdd)
w = Window.orderBy("ids").rowsBetween(
Window.unboundedPreceding, # Take all rows from the beginning of frame
Window.currentRow) # To current row
df1 = df.withColumn("amt5",sum(df.amt1,df.amt2,df.amt3,df.amt4))
df1.withColumn("amt5",sum(df1.amt5).over(w)).show()
Below are the exception I am getting after execution of above code:
py4j.protocol.Py4JJavaError: An error occurred while calling o121.withColumn.
: org.apache.spark.sql.AnalysisException: Expression '(amt5#11 + cast(0 as double))' not supported within a window function.;;
Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19, total#19]
+- Window [(amt5#11 + cast(0 as double)) windowspecdefinition(ids#4 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total#19], [ids#4 ASC NULLS FIRST]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, ((((cast(amt1#0 as double) + cast(0 as double)) + cast(amt2#1 as double)) + cast(amt3#2 as double)) + cast(amt4#3 as double)) AS amt5#11]
+- LogicalRDD [amt1#0, amt2#1, amt3#2, amt4#3, ids#4]
您在 sum
函数中遇到碰撞。 window 函数应该来自 pyspark.sql.functions
包,所以你应该这样调用它:
df1.withColumn("amt5",func.sum(df1.amt5).over(w)).show()