Spark Windowsspec 滞后函数计算累积分数
Spark Windowspec lag function calculating cumulative scores
我有一个包含每天分数的数据框,我想计算每个用户的累积 运行 分数。我需要在新列上将前一天的累积分数与今天的分数相加,我尝试了 lag
函数进行计算,但由于某些原因它给出了错误。
这是我试过的代码:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val genre = sc.parallelize(List(("Alice", "2016-05-01", "action",0),
("Alice", "2016-05-02", "0",1),
("Alice", "2016-05-03", "comedy",0),
("Alice", "2016-05-04", "action",1),
("Alice", "2016-05-05", "action",0),
("Alice", "2016-05-06", "horror",1),
("Bob", "2016-05-01", "art",0),
("Bob", "2016-05-02", "0",1),
("Bob", "2016-05-03", "0",0),
("Bob", "2016-05-04", "art",0),
("Bob", "2016-05-05", "comedy",1),
("Bob", "2016-05-06", "action",0))).
toDF("name", "date", "genre","score")
val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn( "CumScore",genre("score")*2+ lag(("CumScore"),1).over(wSpec2)*2 ).show()
数据框:
-----+----------+------+-----+
| name| date| genre|score|
+-----+----------+------+-----+
|Alice|2016-05-01|action| 0|
|Alice|2016-05-02| 0| 1|
|Alice|2016-05-03|comedy| 0|
|Alice|2016-05-04|action| 1|
|Alice|2016-05-05|action| 0|
|Alice|2016-05-06|horror| 1|
| Bob|2016-05-01| art| 0|
| Bob|2016-05-02| 0| 1|
| Bob|2016-05-03| 0| 0|
| Bob|2016-05-04| art| 0|
| Bob|2016-05-05|comedy| 1|
| Bob|2016-05-06|action| 0|
+-----+----------+------+-----+
我收到错误
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apa
我尝试了以下方法:
val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
val test = genre.withColumn( "CumScore",genre("score")*2)
test.show()
val wSpec3 = Window.partitionBy("name").orderBy("date")
test.withColumn("CumScore_1",test("CumScore")+lag(test("CumScore"),1).over(wSpec3)).show()
我们需要定义另一个 window 函数,因为我们在将前一天的累计分数与今天的分数相加到一个新列时不需要指定行框。
您可以参考:http://xinhstechblog.blogspot.in/2016/04/spark-window-functions-for-dataframes.html
没有必要使用lag
,只需在用户上使用window分区,然后使用sum
:
val window = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn("CumScore", sum($"score").over(window))
使用问题中的输入数据,这将给出:
+-----+----------+------+-----+--------+
| name| date| genre|score|CumScore|
+-----+----------+------+-----+--------+
| Bob|2016-05-01| art| 0| 0|
| Bob|2016-05-02| 0| 1| 1|
| Bob|2016-05-03| 0| 0| 1|
| Bob|2016-05-04| art| 0| 1|
| Bob|2016-05-05|comedy| 1| 2|
| Bob|2016-05-06|action| 0| 2|
|Alice|2016-05-01|action| 0| 0|
|Alice|2016-05-02| 0| 1| 1|
|Alice|2016-05-03|comedy| 0| 1|
|Alice|2016-05-04|action| 1| 2|
|Alice|2016-05-05|action| 0| 2|
|Alice|2016-05-06|horror| 1| 3|
+-----+----------+------+-----+--------+
这里使用 lag
的问题在于,该列在创建它的同一表达式中使用(该列在 withColumn
表达式中使用。即使它是以前的值引用这是不允许的。
我有一个包含每天分数的数据框,我想计算每个用户的累积 运行 分数。我需要在新列上将前一天的累积分数与今天的分数相加,我尝试了 lag
函数进行计算,但由于某些原因它给出了错误。
这是我试过的代码:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val genre = sc.parallelize(List(("Alice", "2016-05-01", "action",0),
("Alice", "2016-05-02", "0",1),
("Alice", "2016-05-03", "comedy",0),
("Alice", "2016-05-04", "action",1),
("Alice", "2016-05-05", "action",0),
("Alice", "2016-05-06", "horror",1),
("Bob", "2016-05-01", "art",0),
("Bob", "2016-05-02", "0",1),
("Bob", "2016-05-03", "0",0),
("Bob", "2016-05-04", "art",0),
("Bob", "2016-05-05", "comedy",1),
("Bob", "2016-05-06", "action",0))).
toDF("name", "date", "genre","score")
val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn( "CumScore",genre("score")*2+ lag(("CumScore"),1).over(wSpec2)*2 ).show()
数据框:
-----+----------+------+-----+
| name| date| genre|score|
+-----+----------+------+-----+
|Alice|2016-05-01|action| 0|
|Alice|2016-05-02| 0| 1|
|Alice|2016-05-03|comedy| 0|
|Alice|2016-05-04|action| 1|
|Alice|2016-05-05|action| 0|
|Alice|2016-05-06|horror| 1|
| Bob|2016-05-01| art| 0|
| Bob|2016-05-02| 0| 1|
| Bob|2016-05-03| 0| 0|
| Bob|2016-05-04| art| 0|
| Bob|2016-05-05|comedy| 1|
| Bob|2016-05-06|action| 0|
+-----+----------+------+-----+
我收到错误
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apa
我尝试了以下方法:
val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
val test = genre.withColumn( "CumScore",genre("score")*2)
test.show()
val wSpec3 = Window.partitionBy("name").orderBy("date")
test.withColumn("CumScore_1",test("CumScore")+lag(test("CumScore"),1).over(wSpec3)).show()
我们需要定义另一个 window 函数,因为我们在将前一天的累计分数与今天的分数相加到一个新列时不需要指定行框。
您可以参考:http://xinhstechblog.blogspot.in/2016/04/spark-window-functions-for-dataframes.html
没有必要使用lag
,只需在用户上使用window分区,然后使用sum
:
val window = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn("CumScore", sum($"score").over(window))
使用问题中的输入数据,这将给出:
+-----+----------+------+-----+--------+
| name| date| genre|score|CumScore|
+-----+----------+------+-----+--------+
| Bob|2016-05-01| art| 0| 0|
| Bob|2016-05-02| 0| 1| 1|
| Bob|2016-05-03| 0| 0| 1|
| Bob|2016-05-04| art| 0| 1|
| Bob|2016-05-05|comedy| 1| 2|
| Bob|2016-05-06|action| 0| 2|
|Alice|2016-05-01|action| 0| 0|
|Alice|2016-05-02| 0| 1| 1|
|Alice|2016-05-03|comedy| 0| 1|
|Alice|2016-05-04|action| 1| 2|
|Alice|2016-05-05|action| 0| 2|
|Alice|2016-05-06|horror| 1| 3|
+-----+----------+------+-----+--------+
这里使用 lag
的问题在于,该列在创建它的同一表达式中使用(该列在 withColumn
表达式中使用。即使它是以前的值引用这是不允许的。