在 Spark 中使用 Windows 函数进行每周聚合
Weekly Aggregation using Windows Function in Spark
我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是一周的每周汇总。我按以下方式使用 window 函数
val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
.agg(sum("Value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
我在数据框中的数据为
DateTime,value
2017-01-01T00:00:00.000+05:30,1.2
2017-01-01T00:15:00.000+05:30,1.30
--
2017-01-07T23:30:00.000+05:30,1.43
2017-01-07T23:45:00.000+05:30,1.4
我得到的输出为:
2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
显示我的一天是从2016年12月29日开始的,但实际数据是从2017年1月1日开始的,为什么会出现这个差值?
对于像这样的翻滚windows,可以设置开始时间的偏移量,更多信息可以在博客here中找到。使用滑动 window,但是,通过将 "window duration" 和 "sliding duration" 设置为相同的值,它将与具有起始偏移量的翻滚 window 相同。
语法如下,
window(column, window duration, sliding duration, starting offset)
根据您的值,我发现 64 小时的偏移量会给出 2017-01-01 00:00:00
的开始时间。
val data = Seq(("2017-01-01 00:00:00",1.0),
("2017-01-01 00:15:00",2.0),
("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
.withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))
val df2 = df
.groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
.agg(sum("value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
将给出此结果数据框:
+-------------------+-------------------+-------------+
| start| end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00| 3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00| 1.43|
+-------------------+-------------------+-------------+
使用 python API 的解决方案看起来更直观一些,因为 window
函数适用于以下选项:
window(timeColumn, windowDuration, slideDuration=None, startTime=None)
看:
https://spark.apache.org/docs/2.4.0/api/python/_modules/pyspark/sql/functions.html
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC
with which to start window intervals. For example, in order to have
hourly tumbling windows that start 15 minutes past the hour, e.g.
12:15-13:15, 13:15-14:15... provide startTime
as 15 minutes
.
不需要使用 sliding duration
的解决方法,我使用了 3 天 "delay" 作为 startTime
来匹配所需的翻滚 window:
from datetime import datetime
from pyspark.sql.functions import sum, window
df_ex = spark.createDataFrame([(datetime(2017,1,1, 0,0) , 1.), \
(datetime(2017,1,1,0,15) , 2.), \
(datetime(2017,1,8,23,30) , 1.43)], \
["Datetime", "value"])
weekly_ex = df_ex \
.groupBy(window("Datetime", "1 week", startTime="3 day" )) \
.agg(sum("value").alias('aggregate_sum'))
weekly_ex.show(truncate=False)
同样的结果:
+------------------------------------------+-------------+
|window |aggregate_sum|
+------------------------------------------+-------------+
|[2017-01-01 00:00:00, 2017-01-08 00:00:00]|3.0 |
|[2017-01-08 00:00:00, 2017-01-15 00:00:00]|1.43 |
+------------------------------------------+-------------+
我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是一周的每周汇总。我按以下方式使用 window 函数
val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
.agg(sum("Value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
我在数据框中的数据为
DateTime,value
2017-01-01T00:00:00.000+05:30,1.2
2017-01-01T00:15:00.000+05:30,1.30
--
2017-01-07T23:30:00.000+05:30,1.43
2017-01-07T23:45:00.000+05:30,1.4
我得到的输出为:
2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
显示我的一天是从2016年12月29日开始的,但实际数据是从2017年1月1日开始的,为什么会出现这个差值?
对于像这样的翻滚windows,可以设置开始时间的偏移量,更多信息可以在博客here中找到。使用滑动 window,但是,通过将 "window duration" 和 "sliding duration" 设置为相同的值,它将与具有起始偏移量的翻滚 window 相同。
语法如下,
window(column, window duration, sliding duration, starting offset)
根据您的值,我发现 64 小时的偏移量会给出 2017-01-01 00:00:00
的开始时间。
val data = Seq(("2017-01-01 00:00:00",1.0),
("2017-01-01 00:15:00",2.0),
("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
.withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))
val df2 = df
.groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
.agg(sum("value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
将给出此结果数据框:
+-------------------+-------------------+-------------+
| start| end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00| 3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00| 1.43|
+-------------------+-------------------+-------------+
使用 python API 的解决方案看起来更直观一些,因为 window
函数适用于以下选项:
window(timeColumn, windowDuration, slideDuration=None, startTime=None)
看:
https://spark.apache.org/docs/2.4.0/api/python/_modules/pyspark/sql/functions.html
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide
startTime
as15 minutes
.
不需要使用 sliding duration
的解决方法,我使用了 3 天 "delay" 作为 startTime
来匹配所需的翻滚 window:
from datetime import datetime
from pyspark.sql.functions import sum, window
df_ex = spark.createDataFrame([(datetime(2017,1,1, 0,0) , 1.), \
(datetime(2017,1,1,0,15) , 2.), \
(datetime(2017,1,8,23,30) , 1.43)], \
["Datetime", "value"])
weekly_ex = df_ex \
.groupBy(window("Datetime", "1 week", startTime="3 day" )) \
.agg(sum("value").alias('aggregate_sum'))
weekly_ex.show(truncate=False)
同样的结果:
+------------------------------------------+-------------+
|window |aggregate_sum|
+------------------------------------------+-------------+
|[2017-01-01 00:00:00, 2017-01-08 00:00:00]|3.0 |
|[2017-01-08 00:00:00, 2017-01-15 00:00:00]|1.43 |
+------------------------------------------+-------------+