部件的 PySpark 结构化流式传输和过滤处理

PySpark structured streaming and filtered processing for parts

我想评估 Spark 2.4 中的流式(未绑定)数据帧:

time          id   value
6:00:01.000   1     333
6:00:01.005   1     123
6:00:01.050   2     544
6:00:01.060   2     544

当id 1的所有数据都进入dataframe,下一个id 2的数据到来时,我想对id 1的完整数据进行计算。但是我该怎么做呢?我想我不能使用 window 函数,因为我不知道提前的时间也因每个 id 而异。而且我也不知道除了流数据帧之外的其他来源的id。

我想到的唯一解决方案包含变量比较(内存)和 while 循环:

id_old = 0 # start value
while true:
  id_cur = id_from_dataframe
  if id_cur != id_old: # id has changed
      do calulation for id_cur
      id_old = id_cur

但我认为这不是正确的解决方案。你能给我一个帮助我的提示或文档吗,因为我找不到示例或文档。

我明白了运行水印和分组的结合:

import pyspark.sql.functions as F

d2 = d1.withWatermark("time", "60 second") \
    .groupby('id', \
             F.window('time', "40 second")) \
    .agg(
         F.count("*").alias("count"), \
         F.min("time").alias("time_start"), \
         F.max("time").alias("time_stop"), \
         F.round(F.avg("value"),1).alias('value_avg'))

大多数文档仅显示了仅按时间分组的基本内容,我只看到了一个带有另一个分组参数的示例,因此我将 'id'那里。