将数据流式传输到 delta table 并保存最新值
Streaming data to delta table with saving the latest values
我正在将一些温度数据从 Azure 事件中心流式传输到 Databricks,并希望将最新值存储在增量 table 中。对于每个传感器的温度值,我取的是最后五分钟的最大值。我似乎遇到了 'upsert' 到增量 table 的障碍。每台设备每 10-15 秒发送一次数据。我不确定我是否正确使用了 writeStream 或者可能必须在数据帧上使用 window 函数来插入最新的聚合值。
到目前为止,我已经在pysprak中创建了一个基本示例,看看是否可以完成
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# rounds up the time into 5 minutes
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'),
to_timestamp(from_unixtime(round(((get_json_object(df.body,'$.timestamp')/1000)/300))*300.0 ,"yyyy-MM-dd HH:mm:ss")).alias("roundedDatetime")
)
# Groups by the sensor id and round date
df = df.groupBy("sensorId", "roundedDatetime").agg(max("temp").cast("int").alias("temp"))
一切正常,我可以看到 5 分钟聚合级别的数据
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="5 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my basic batch function, taken from the example docs on streaming
def upsertToDelta(microbatchdf, batchId):
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
因此,当第一个 运行 进入一个空的增量 table 时,没问题,五分钟后出现合并冲突,因为它试图插入相同的值。它是否试图更新整个数据框而不是最新的项目?
我看过滑动 windows 对事件时间进行分组,但这似乎不起作用。我正在考虑在微批处理函数中添加一个 windowing 函数,这样它只会在有多个项目时插入最新值,例如 10:00am 和 10:05am 在四舍五入的值中,它将采用 10:05 一个。建议?我想我可能不太正确触发?我试过将它降低和提高一分钟,但没有任何乐趣。
我想你忘了给 t 作为你最新的 SensorReadings table 的别名。你能试试吗:
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
因此,在环顾四周并深入研究流媒体之后,我发现我的做法似乎是错误的。为了让它做我想做的事,我需要在处理批次之前删除分组。
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# get the details from the event hub binary
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'))
所以我只是获取详细信息,然后每 5 分钟处理一次该批次。所以我的批处理函数看起来像:
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="1 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my updated batch function, now doing the grouping
def upsertToDelta(microbatchdf, batchId):
microbatchdf = microbatchdf.groupBy("sensorId").agg(max("temp").cast("int").alias("temp"))
microbatchdf = microbatchdf.withColumn("latestDatetime", current_timestamp())
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
我正在将一些温度数据从 Azure 事件中心流式传输到 Databricks,并希望将最新值存储在增量 table 中。对于每个传感器的温度值,我取的是最后五分钟的最大值。我似乎遇到了 'upsert' 到增量 table 的障碍。每台设备每 10-15 秒发送一次数据。我不确定我是否正确使用了 writeStream 或者可能必须在数据帧上使用 window 函数来插入最新的聚合值。
到目前为止,我已经在pysprak中创建了一个基本示例,看看是否可以完成
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# rounds up the time into 5 minutes
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'),
to_timestamp(from_unixtime(round(((get_json_object(df.body,'$.timestamp')/1000)/300))*300.0 ,"yyyy-MM-dd HH:mm:ss")).alias("roundedDatetime")
)
# Groups by the sensor id and round date
df = df.groupBy("sensorId", "roundedDatetime").agg(max("temp").cast("int").alias("temp"))
一切正常,我可以看到 5 分钟聚合级别的数据
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="5 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my basic batch function, taken from the example docs on streaming
def upsertToDelta(microbatchdf, batchId):
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
因此,当第一个 运行 进入一个空的增量 table 时,没问题,五分钟后出现合并冲突,因为它试图插入相同的值。它是否试图更新整个数据框而不是最新的项目?
我看过滑动 windows 对事件时间进行分组,但这似乎不起作用。我正在考虑在微批处理函数中添加一个 windowing 函数,这样它只会在有多个项目时插入最新值,例如 10:00am 和 10:05am 在四舍五入的值中,它将采用 10:05 一个。建议?我想我可能不太正确触发?我试过将它降低和提高一分钟,但没有任何乐趣。
我想你忘了给 t 作为你最新的 SensorReadings table 的别名。你能试试吗:
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
因此,在环顾四周并深入研究流媒体之后,我发现我的做法似乎是错误的。为了让它做我想做的事,我需要在处理批次之前删除分组。
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# get the details from the event hub binary
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'))
所以我只是获取详细信息,然后每 5 分钟处理一次该批次。所以我的批处理函数看起来像:
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="1 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my updated batch function, now doing the grouping
def upsertToDelta(microbatchdf, batchId):
microbatchdf = microbatchdf.groupBy("sensorId").agg(max("temp").cast("int").alias("temp"))
microbatchdf = microbatchdf.withColumn("latestDatetime", current_timestamp())
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")