部件的 PySpark 结构化流式传输和过滤处理
PySpark structured streaming and filtered processing for parts
我想评估 Spark 2.4 中的流式(未绑定)数据帧:
time id value
6:00:01.000 1 333
6:00:01.005 1 123
6:00:01.050 2 544
6:00:01.060 2 544
当id 1的所有数据都进入dataframe,下一个id 2的数据到来时,我想对id 1的完整数据进行计算。但是我该怎么做呢?我想我不能使用 window 函数,因为我不知道提前的时间也因每个 id 而异。而且我也不知道除了流数据帧之外的其他来源的id。
我想到的唯一解决方案包含变量比较(内存)和 while 循环:
id_old = 0 # start value
while true:
id_cur = id_from_dataframe
if id_cur != id_old: # id has changed
do calulation for id_cur
id_old = id_cur
但我认为这不是正确的解决方案。你能给我一个帮助我的提示或文档吗,因为我找不到示例或文档。
我明白了运行水印和分组的结合:
import pyspark.sql.functions as F
d2 = d1.withWatermark("time", "60 second") \
.groupby('id', \
F.window('time', "40 second")) \
.agg(
F.count("*").alias("count"), \
F.min("time").alias("time_start"), \
F.max("time").alias("time_stop"), \
F.round(F.avg("value"),1).alias('value_avg'))
大多数文档仅显示了仅按时间分组的基本内容,我只看到了一个带有另一个分组参数的示例,因此我将 'id'那里。
我想评估 Spark 2.4 中的流式(未绑定)数据帧:
time id value
6:00:01.000 1 333
6:00:01.005 1 123
6:00:01.050 2 544
6:00:01.060 2 544
当id 1的所有数据都进入dataframe,下一个id 2的数据到来时,我想对id 1的完整数据进行计算。但是我该怎么做呢?我想我不能使用 window 函数,因为我不知道提前的时间也因每个 id 而异。而且我也不知道除了流数据帧之外的其他来源的id。
我想到的唯一解决方案包含变量比较(内存)和 while 循环:
id_old = 0 # start value
while true:
id_cur = id_from_dataframe
if id_cur != id_old: # id has changed
do calulation for id_cur
id_old = id_cur
但我认为这不是正确的解决方案。你能给我一个帮助我的提示或文档吗,因为我找不到示例或文档。
我明白了运行水印和分组的结合:
import pyspark.sql.functions as F
d2 = d1.withWatermark("time", "60 second") \
.groupby('id', \
F.window('time', "40 second")) \
.agg(
F.count("*").alias("count"), \
F.min("time").alias("time_start"), \
F.max("time").alias("time_stop"), \
F.round(F.avg("value"),1).alias('value_avg'))
大多数文档仅显示了仅按时间分组的基本内容,我只看到了一个带有另一个分组参数的示例,因此我将 'id'那里。