为 Spark 中的 rowsBetween 和 rangeBetween 指定默认值
Specify default value for rowsBetween and rangeBetween in Spark
我有一个关于 Sparks Dataframe 1.6 中的 window 操作的问题。
假设我有以下 table:
id|MONTH |number
1 201703 2
1 201704 3
1 201705 7
1 201706 6
目前我正在使用 rowsBetween 函数:
val window = Window.partitionBy("id")
.orderBy(asc("MONTH"))
.rowsBetween(-2, 0)
randomDF.withColumn("counter", sum(col("number")).over(window))
这给了我以下结果:
id|MONTH |number |counter
1 201703 2 2
1 201704 3 5
1 201705 7 12
1 201706 6 16
我不想实现的是在没有前置行时设置默认值(如在 lag() 和 lead() 中)。例如:'0' 这样我得到的结果如下:
id|MONTH |number |counter
1 201703 2 0
1 201704 3 0
1 201705 7 12
1 201706 6 16
我已经查看了文档,但 Spark 1.6 不允许这样做,我想知道是否有某种解决方法。
非常感谢!
像这样的地方怎么样:
- 添加额外的
lag
步骤
- 将值替换为
case
代码
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row(1, 1, 201703, 2),
Row(2, 1, 201704, 3),
Row(3, 1, 201705, 7),
Row(4, 1, 201706, 6)))
val schema: StructType = new StructType()
.add(StructField("sortColumn", IntegerType, false))
.add(StructField("id", IntegerType, false))
.add(StructField("month", IntegerType, false))
.add(StructField("number", IntegerType, false))
val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)
val prevRows = 2
val window = Window.partitionBy("id")
.orderBy(col("month"))
.rowsBetween(-prevRows, 0)
val window2 = Window.partitionBy("id")
.orderBy(col("month"))
val df2 = df0.withColumn("counter", sum(col("number")).over(window))
val df3 = df2.withColumn("myLagTmp", lag(lit(1), prevRows).over(window2))
val df4 = df3.withColumn("counter", expr("case when myLagTmp is null then 0 else counter end")).drop(col("myLagTmp"))
df4.sort("sortColumn").show()
感谢@astro_asz的回答,我想出了以下解决方案:
val numberRowsBetween = 2
val window1 = Window.partitionBy("id").orderBy("MONTH")
val window2 = Window.partitionBy("id")
.orderBy(asc("MONTH"))
.rowsBetween(-(numberRowsBetween - 1), 0)
randomDF.withColumn("counter", when(lag(col("number"), numberRowsBetween , 0).over(window1) === 0, 0)
.otherwise(sum(col("number")).over(window2)))
此解决方案会将“0”设置为默认值。
我有一个关于 Sparks Dataframe 1.6 中的 window 操作的问题。
假设我有以下 table:
id|MONTH |number
1 201703 2
1 201704 3
1 201705 7
1 201706 6
目前我正在使用 rowsBetween 函数:
val window = Window.partitionBy("id")
.orderBy(asc("MONTH"))
.rowsBetween(-2, 0)
randomDF.withColumn("counter", sum(col("number")).over(window))
这给了我以下结果:
id|MONTH |number |counter
1 201703 2 2
1 201704 3 5
1 201705 7 12
1 201706 6 16
我不想实现的是在没有前置行时设置默认值(如在 lag() 和 lead() 中)。例如:'0' 这样我得到的结果如下:
id|MONTH |number |counter
1 201703 2 0
1 201704 3 0
1 201705 7 12
1 201706 6 16
我已经查看了文档,但 Spark 1.6 不允许这样做,我想知道是否有某种解决方法。
非常感谢!
像这样的地方怎么样:
- 添加额外的
lag
步骤 - 将值替换为
case
代码
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row(1, 1, 201703, 2),
Row(2, 1, 201704, 3),
Row(3, 1, 201705, 7),
Row(4, 1, 201706, 6)))
val schema: StructType = new StructType()
.add(StructField("sortColumn", IntegerType, false))
.add(StructField("id", IntegerType, false))
.add(StructField("month", IntegerType, false))
.add(StructField("number", IntegerType, false))
val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)
val prevRows = 2
val window = Window.partitionBy("id")
.orderBy(col("month"))
.rowsBetween(-prevRows, 0)
val window2 = Window.partitionBy("id")
.orderBy(col("month"))
val df2 = df0.withColumn("counter", sum(col("number")).over(window))
val df3 = df2.withColumn("myLagTmp", lag(lit(1), prevRows).over(window2))
val df4 = df3.withColumn("counter", expr("case when myLagTmp is null then 0 else counter end")).drop(col("myLagTmp"))
df4.sort("sortColumn").show()
感谢@astro_asz的回答,我想出了以下解决方案:
val numberRowsBetween = 2
val window1 = Window.partitionBy("id").orderBy("MONTH")
val window2 = Window.partitionBy("id")
.orderBy(asc("MONTH"))
.rowsBetween(-(numberRowsBetween - 1), 0)
randomDF.withColumn("counter", when(lag(col("number"), numberRowsBetween , 0).over(window1) === 0, 0)
.otherwise(sum(col("number")).over(window2)))
此解决方案会将“0”设置为默认值。