在 Spark 中使用 Windows 函数进行每周聚合

Weekly Aggregation using Windows Function in Spark

我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是一周的每周汇总。我按以下方式使用 window 函数

val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
      .agg(sum("Value") as "aggregate_sum")
      .select("window.start", "window.end", "aggregate_sum")

我在数据框中的数据为

    DateTime,value
    2017-01-01T00:00:00.000+05:30,1.2
    2017-01-01T00:15:00.000+05:30,1.30
--
    2017-01-07T23:30:00.000+05:30,1.43
    2017-01-07T23:45:00.000+05:30,1.4

我得到的输出为:

2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74

显示我的一天是从2016年12月29日开始的,但实际数据是从2017年1月1日开始的,为什么会出现这个差值?

对于像这样的翻滚windows,可以设置开始时间的偏移量,更多信息可以在博客here中找到。使用滑动 window,但是,通过将 "window duration" 和 "sliding duration" 设置为相同的值,它将与具有起始偏移量的翻滚 window 相同。

语法如下,

window(column, window duration, sliding duration, starting offset)

根据您的值,我发现 64 小时的偏移量会给出 2017-01-01 00:00:00 的开始时间。

val data = Seq(("2017-01-01 00:00:00",1.0),
               ("2017-01-01 00:15:00",2.0),
               ("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
  .withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))

val df2 = df
  .groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
  .agg(sum("value") as "aggregate_sum")
  .select("window.start", "window.end", "aggregate_sum")

将给出此结果数据框:

+-------------------+-------------------+-------------+
|              start|                end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00|          3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00|         1.43|
+-------------------+-------------------+-------------+

使用 python API 的解决方案看起来更直观一些,因为 window 函数适用于以下选项: window(timeColumn, windowDuration, slideDuration=None, startTime=None) 看: https://spark.apache.org/docs/2.4.0/api/python/_modules/pyspark/sql/functions.html

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes.

不需要使用 sliding duration 的解决方法,我使用了 3 天 "delay" 作为 startTime 来匹配所需的翻滚 window:

from datetime import datetime 
from pyspark.sql.functions import sum, window
df_ex = spark.createDataFrame([(datetime(2017,1,1, 0,0) , 1.), \
                               (datetime(2017,1,1,0,15) , 2.), \
                               (datetime(2017,1,8,23,30) , 1.43)], \
                               ["Datetime", "value"])

weekly_ex = df_ex \
            .groupBy(window("Datetime", "1 week", startTime="3 day" )) \
            .agg(sum("value").alias('aggregate_sum'))

weekly_ex.show(truncate=False)

同样的结果:

+------------------------------------------+-------------+
|window                                    |aggregate_sum|
+------------------------------------------+-------------+
|[2017-01-01 00:00:00, 2017-01-08 00:00:00]|3.0          |
|[2017-01-08 00:00:00, 2017-01-15 00:00:00]|1.43         |
+------------------------------------------+-------------+