Pyspark Streaming - 仅从 [window_start、window_end] 开始显示 window

Pyspark Streaming - display starting window only from [window_start, window_end]

我正在以流的形式读取 csv 文件列表,并使用 1 小时的时间间隔存储时间戳。

import pyspark.sql.functions as F

stream = streaming.selectExpr("car", "cost", "timestamp")\
        .withWatermark("timestamp", "30 seconds")\
        .groupBy(F.col("car"), F.window("timestamp", "1 hour").alias("tmst_window"))\
        .agg(F.sum("cost").alias("agg_cost")) 

    +--------------+------------------------------------------+------------------+
    |car           |tmst_window                               |agg_cost          |
    +--------------+------------------------------------------+------------------+
    |Toyota        |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|10                |
    |Ford          |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|20                |
    |Audi          |[2010-12-01 13:00:00, 2010-12-01 14:00:00]|30                |

如何显示它而不是仅使用开始 window,并排除结束 window 时间戳?我想实时得到结果,所以我不想把它存储到一个临时数据框然后 split/explode 数据。如何重写上面的流式查询以生成下面的内容?

    +--------------+--------------------------------------+------------------+
    |car           |tmst_window                           |agg_cost          |
    +--------------+--------------------------------------+------------------+
    |Toyota        |2010-12-01 14:00:00                   |10                |
    |Ford          |2010-12-01 14:00:00                   |20                |
    |Audi          |2010-12-01 13:00:00                   |30                |

如果打印出 stream 的内容,您会注意到列 tmst_window 是一个包含元素 startend:

的结构类型
root
 |-- car: string (nullable = true)
 |-- tmst_window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- agg_cost: double (nullable = true)

因此,您可以 select 开始元素,例如 F.col('tmst_window')['start'] 或者更简单的 F.col('tmst_window.start')