Apache Spark - 如何使用 Window 事件时间操作设计自己的聚合
Apache Spark - How to design own aggregation with Window Operations on Event Time
我想计算供需比率。使用 time_to_pick_up、time_to_drop_off、pick_up_location、drop_off_location 字段重新编码数据。
2019-01-01 00:22:21,2019-01-01 00:43:43,联合广场,SoHo
我要将记录拆分为两条记录,因此数据如下所示:
2019-01-01 00:22:21, Union Sq, PICK
2019-01-01 00:43:43, SoHo, DROP
2019-01-01 00:22:23, Union Sq, DROP
.....
比率 = (PICK+ DROP)/PICK = 2/1 = 2
(因为 DROP 意味着汽车可用,所以供应数量加一。
; PICK的记录将同时贡献需求和供应
) 我想每 5 分钟在一小时内报告比率
val perMinCount = ds.filter(col("zone")== "Union Sq").withWatermark("datetime", "10 minutes")
.groupBy($"zone",window($"datetime", "1 hours","5 minutes")).agg(?)
这就是我目前所做的。我怎样才能实现它?
您也可以在不编写自定义聚合的情况下解决这个问题。
您可以使用 when,否则使用 groupBy,agg 来实现此目的。
例如
//streaming df
+-------------------+--------+-----+
|datetime |zone |event|
+-------------------+--------+-----+
|2019-11-06 11:32:21|Union Sq|PICK |
|2019-11-06 11:32:22|SoHo |DROP |
|2019-11-06 11:32:23|Union Sq|DROP |
+-------------------+--------+-----+
val enrichedStreamingDf = streamingDf
.withColumn("demand",
when(col("event") === "PICK", lit(1))
.when(col("event") === "DROP", lit(0))
.otherwise(lit(0)))
.withColumn("supply",
when(col("event") === "PICK", lit(1))
.when(col("event") === "DROP", lit(1))
.otherwise(lit(0)))
.filter(col("zone") === "Union Sq")
//enriched streaming df
+-------------------+--------+-----+------+------+
|datetime |zone |event|demand|supply|
+-------------------+--------+-----+------+------+
|2019-11-06 11:32:21|Union Sq|PICK |1 |1 |
|2019-11-06 11:32:23|Union Sq|DROP |0 |1 |
+-------------------+--------+-----+------+------+
val ratioDf = enrichedStreamingDf
.withWatermark("datetime", "10 minutes")
.groupBy(
window($"datetime", "10 minutes", "5 minutes"),
$"zone"
)
.agg((sum($"supply") / sum($"demand")).as("sup_dem_ratio"))
//supply to demand ratio streaming df
+------------------------------------------+--------+-------------+
|window |zone |sup_dem_ratio|
+------------------------------------------+--------+-------------+
|[2019-11-06 11:25:00, 2019-11-06 11:35:00]|Union Sq|2.0 |
|[2019-11-06 11:30:00, 2019-11-06 11:40:00]|Union Sq|2.0 |
+------------------------------------------+--------+-------------+
在这个例子中,我将 window 的总持续时间设为“10 分钟”,您可以根据您的情况使用“1 小时”。
我想计算供需比率。使用 time_to_pick_up、time_to_drop_off、pick_up_location、drop_off_location 字段重新编码数据。
2019-01-01 00:22:21,2019-01-01 00:43:43,联合广场,SoHo
我要将记录拆分为两条记录,因此数据如下所示:
2019-01-01 00:22:21, Union Sq, PICK
2019-01-01 00:43:43, SoHo, DROP
2019-01-01 00:22:23, Union Sq, DROP
.....
比率 = (PICK+ DROP)/PICK = 2/1 = 2 (因为 DROP 意味着汽车可用,所以供应数量加一。 ; PICK的记录将同时贡献需求和供应 ) 我想每 5 分钟在一小时内报告比率
val perMinCount = ds.filter(col("zone")== "Union Sq").withWatermark("datetime", "10 minutes")
.groupBy($"zone",window($"datetime", "1 hours","5 minutes")).agg(?)
这就是我目前所做的。我怎样才能实现它?
您也可以在不编写自定义聚合的情况下解决这个问题。 您可以使用 when,否则使用 groupBy,agg 来实现此目的。 例如
//streaming df
+-------------------+--------+-----+
|datetime |zone |event|
+-------------------+--------+-----+
|2019-11-06 11:32:21|Union Sq|PICK |
|2019-11-06 11:32:22|SoHo |DROP |
|2019-11-06 11:32:23|Union Sq|DROP |
+-------------------+--------+-----+
val enrichedStreamingDf = streamingDf
.withColumn("demand",
when(col("event") === "PICK", lit(1))
.when(col("event") === "DROP", lit(0))
.otherwise(lit(0)))
.withColumn("supply",
when(col("event") === "PICK", lit(1))
.when(col("event") === "DROP", lit(1))
.otherwise(lit(0)))
.filter(col("zone") === "Union Sq")
//enriched streaming df
+-------------------+--------+-----+------+------+
|datetime |zone |event|demand|supply|
+-------------------+--------+-----+------+------+
|2019-11-06 11:32:21|Union Sq|PICK |1 |1 |
|2019-11-06 11:32:23|Union Sq|DROP |0 |1 |
+-------------------+--------+-----+------+------+
val ratioDf = enrichedStreamingDf
.withWatermark("datetime", "10 minutes")
.groupBy(
window($"datetime", "10 minutes", "5 minutes"),
$"zone"
)
.agg((sum($"supply") / sum($"demand")).as("sup_dem_ratio"))
//supply to demand ratio streaming df
+------------------------------------------+--------+-------------+
|window |zone |sup_dem_ratio|
+------------------------------------------+--------+-------------+
|[2019-11-06 11:25:00, 2019-11-06 11:35:00]|Union Sq|2.0 |
|[2019-11-06 11:30:00, 2019-11-06 11:40:00]|Union Sq|2.0 |
+------------------------------------------+--------+-------------+
在这个例子中,我将 window 的总持续时间设为“10 分钟”,您可以根据您的情况使用“1 小时”。