Spark 中的累计和
Cumulative sum in Spark
我想在 Spark 中做累加。这里是寄存器table(输入):
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1|
+---------------+-------------------+----+----+----+
Hive 查询:
select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test
输出:
+---------------+-------------------+----+----+----+-------+--------+
| product_id| date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106| 104|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121| 105|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106| 104|
+---------------+-------------------+----+----+----+-------+--------+
使用 Spark 逻辑,我得到与上面相同的输出:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._
val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show
但是,当我在 spark 集群上尝试此逻辑时 val_sum
值将是累计总和的一半,有时会有所不同。我不知道为什么它会发生在 spark cluster 上。是因为分区吗?
如何对 Spark 集群上的列求和?
要使用 DataFrame API 获取累计和,您应该使用 rowsBetween
window 方法。在 Spark 2.1 和更高版本 中创建 window 如下:
val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
这将告诉 Spark 使用从分区开始到当前行的值。使用 旧版本 的 Spark,使用 rowsBetween(Long.MinValue, 0)
以获得相同的效果。
要使用window,使用与之前相同的方法:
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))
我想在 Spark 中做累加。这里是寄存器table(输入):
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1|
+---------------+-------------------+----+----+----+
Hive 查询:
select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test
输出:
+---------------+-------------------+----+----+----+-------+--------+
| product_id| date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106| 104|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121| 105|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106| 104|
+---------------+-------------------+----+----+----+-------+--------+
使用 Spark 逻辑,我得到与上面相同的输出:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._
val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show
但是,当我在 spark 集群上尝试此逻辑时 val_sum
值将是累计总和的一半,有时会有所不同。我不知道为什么它会发生在 spark cluster 上。是因为分区吗?
如何对 Spark 集群上的列求和?
要使用 DataFrame API 获取累计和,您应该使用 rowsBetween
window 方法。在 Spark 2.1 和更高版本 中创建 window 如下:
val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
这将告诉 Spark 使用从分区开始到当前行的值。使用 旧版本 的 Spark,使用 rowsBetween(Long.MinValue, 0)
以获得相同的效果。
要使用window,使用与之前相同的方法:
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))