Apache Spark - 如何使用 Window 事件时间操作设计自己的聚合

Apache Spark - How to design own aggregation with Window Operations on Event Time

我想计算供需比率。使用 time_to_pick_up、time_to_drop_off、pick_up_location、drop_off_location 字段重新编码数据。

2019-01-01 00:22:21,2019-01-01 00:43:43,联合广场,SoHo

我要将记录拆分为两条记录,因此数据如下所示:

2019-01-01 00:22:21, Union Sq, PICK

2019-01-01 00:43:43, SoHo, DROP

2019-01-01 00:22:23, Union Sq, DROP

.....

比率 = (PICK+ DROP)/PICK = 2/1 = 2 (因为 DROP 意味着汽车可用,所以供应数量加一。 ; PICK的记录将同时贡献需求和供应 ) 我想每 5 分钟在一小时内报告比率

 val perMinCount = ds.filter(col("zone")== "Union Sq").withWatermark("datetime", "10 minutes")
    .groupBy($"zone",window($"datetime", "1 hours","5 minutes")).agg(?)

这就是我目前所做的。我怎样才能实现它?

您也可以在不编写自定义聚合的情况下解决这个问题。 您可以使用 when,否则使用 groupBy,agg 来实现此目的。 例如

//streaming df
+-------------------+--------+-----+
|datetime           |zone    |event|
+-------------------+--------+-----+
|2019-11-06 11:32:21|Union Sq|PICK |
|2019-11-06 11:32:22|SoHo    |DROP |
|2019-11-06 11:32:23|Union Sq|DROP |
+-------------------+--------+-----+

val enrichedStreamingDf = streamingDf
  .withColumn("demand",
    when(col("event") === "PICK", lit(1))
      .when(col("event") === "DROP", lit(0))
      .otherwise(lit(0)))
  .withColumn("supply",
    when(col("event") === "PICK", lit(1))
      .when(col("event") === "DROP", lit(1))
      .otherwise(lit(0)))
  .filter(col("zone") === "Union Sq")

//enriched streaming df
+-------------------+--------+-----+------+------+
|datetime           |zone    |event|demand|supply|
+-------------------+--------+-----+------+------+
|2019-11-06 11:32:21|Union Sq|PICK |1     |1     |
|2019-11-06 11:32:23|Union Sq|DROP |0     |1     |
+-------------------+--------+-----+------+------+

val ratioDf = enrichedStreamingDf
  .withWatermark("datetime", "10 minutes")
  .groupBy(
    window($"datetime", "10 minutes", "5 minutes"),
    $"zone"
  )
  .agg((sum($"supply") / sum($"demand")).as("sup_dem_ratio"))

//supply to demand ratio streaming df
+------------------------------------------+--------+-------------+
|window                                    |zone    |sup_dem_ratio|
+------------------------------------------+--------+-------------+
|[2019-11-06 11:25:00, 2019-11-06 11:35:00]|Union Sq|2.0          |
|[2019-11-06 11:30:00, 2019-11-06 11:40:00]|Union Sq|2.0          |
+------------------------------------------+--------+-------------+
  

在这个例子中,我将 window 的总持续时间设为“10 分钟”,您可以根据您的情况使用“1 小时”。