Spark Dataframe/数据集:通用条件累积和
Spark Dataframe/ Dataset: Generic Conditional cumulative sum
我有一个数据框,它有一些属性(C1 到 C2)、一个偏移量(以天为单位)和一些值(V1、V2)。
val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]
scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
| 1| 2| 30|100| -1|
| 1| 2| 30|100| 0|
| 1| 2| 30|100| 1|
| 11| 21| 30|100| -1|
| 11| 21| 30|100| 0|
| 11| 21| 30|100| 1|
+---+---+---+---+------+
我需要做的是计算(c1,c2) 跨偏移量的V1、V2 的累积和。
我试过了,但这与适用于任何数据框的通用解决方案相去甚远。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val outputDF = inputDF
.withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
| 1| 2| 30|100| -1|30 | 100 |
| 1| 2| 30|100| 0|60 | 200 |
| 1| 2| 30|100| 1|90 | 300 |
| 11| 21| 30|100| -1|30 | 100 |
| 11| 21| 30|100| 0|60 | 200 |
| 11| 21| 30|100| 1|90 | 300 |
+---+---+---+---+------+-----------------------------
挑战是[a]我需要跨多个不同的偏移量windows(-1到1),(-10到10),(-30到30)或任何其他[ b] 我需要在多个数据帧/数据集中使用这个函数,所以我希望有一个可以在 RDD/数据集中工作的通用函数。
关于如何在 Spark 2.0 中实现这一点有什么想法吗?
非常感谢您的帮助。谢谢!
这是仅使用数据帧的原始拍摄。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val inputDF= spark
.sparkContext
.parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
.toDF("c1", "c2", "v1", "v2", "offset")
val outputDF = inputDF
.withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
这会在单个 'value' 上针对不同的 windows 生成累积和。
scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
| 1| 2| 3|100| -2| 0| 0| 100|
| 1| 2| 30|100| -1| 30| 30| 200|
| 1| 2| 30|100| 0| 60| 60| 300|
| 1| 2| 30|100| 1| 90| 90| 400|
| 1| 2|140|100| 2| 90| 90| 500|
| 11| 21| 30|100| -1| 30| 30| 100|
| 11| 21| 30|100| 0| 60| 60| 200|
| 11| 21| 30|100| 1| 90| 90| 300|
+---+---+---+---+------+-------------+-------------+-------------+
这种方法的一些缺点 -
[1] 对于每个条件 window (-1,1), (-2,2) 或任何 (from_offset, to_offset),需要单独调用 sum()。
[2] 这不是通用函数。
我知道 spark 接受像这样的聚合函数的可变列列表 -
val exprs = Map("v1" -> "sum", "v2" -> "sum")
但我不确定如何为具有可变条件的 window 函数扩展它。我仍然很好奇是否有更好的模块化/可重用函数可以编写来解决这个问题。
解决此问题的另一种通用方法是使用 foldLeft,如此处所述 -
我有一个数据框,它有一些属性(C1 到 C2)、一个偏移量(以天为单位)和一些值(V1、V2)。
val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]
scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
| 1| 2| 30|100| -1|
| 1| 2| 30|100| 0|
| 1| 2| 30|100| 1|
| 11| 21| 30|100| -1|
| 11| 21| 30|100| 0|
| 11| 21| 30|100| 1|
+---+---+---+---+------+
我需要做的是计算(c1,c2) 跨偏移量的V1、V2 的累积和。
我试过了,但这与适用于任何数据框的通用解决方案相去甚远。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val outputDF = inputDF
.withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
| 1| 2| 30|100| -1|30 | 100 |
| 1| 2| 30|100| 0|60 | 200 |
| 1| 2| 30|100| 1|90 | 300 |
| 11| 21| 30|100| -1|30 | 100 |
| 11| 21| 30|100| 0|60 | 200 |
| 11| 21| 30|100| 1|90 | 300 |
+---+---+---+---+------+-----------------------------
挑战是[a]我需要跨多个不同的偏移量windows(-1到1),(-10到10),(-30到30)或任何其他[ b] 我需要在多个数据帧/数据集中使用这个函数,所以我希望有一个可以在 RDD/数据集中工作的通用函数。
关于如何在 Spark 2.0 中实现这一点有什么想法吗?
非常感谢您的帮助。谢谢!
这是仅使用数据帧的原始拍摄。
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val inputDF= spark
.sparkContext
.parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
.toDF("c1", "c2", "v1", "v2", "offset")
val outputDF = inputDF
.withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
这会在单个 'value' 上针对不同的 windows 生成累积和。
scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
| 1| 2| 3|100| -2| 0| 0| 100|
| 1| 2| 30|100| -1| 30| 30| 200|
| 1| 2| 30|100| 0| 60| 60| 300|
| 1| 2| 30|100| 1| 90| 90| 400|
| 1| 2|140|100| 2| 90| 90| 500|
| 11| 21| 30|100| -1| 30| 30| 100|
| 11| 21| 30|100| 0| 60| 60| 200|
| 11| 21| 30|100| 1| 90| 90| 300|
+---+---+---+---+------+-------------+-------------+-------------+
这种方法的一些缺点 - [1] 对于每个条件 window (-1,1), (-2,2) 或任何 (from_offset, to_offset),需要单独调用 sum()。 [2] 这不是通用函数。
我知道 spark 接受像这样的聚合函数的可变列列表 -
val exprs = Map("v1" -> "sum", "v2" -> "sum")
但我不确定如何为具有可变条件的 window 函数扩展它。我仍然很好奇是否有更好的模块化/可重用函数可以编写来解决这个问题。
解决此问题的另一种通用方法是使用 foldLeft,如此处所述 -