如何在使用合并更新插入后从 Delta table 获取 new/updated 记录?
How to get new/updated records from Delta table after upsert using merge?
在 Spark Streaming 作业中,有什么方法可以将 upsert using merge 之后的 updated/inserted 行获取到 Delta table?
val df = spark.readStream(...)
val deltaTable = DeltaTable.forName("...")
def upsertToDelta(events: DataFrame, batchId: Long) {
deltaTable.as("table")
.merge(
events.as("event"),
"event.entityId == table.entityId")
.whenMatched()
.updateExpr(...))
.whenNotMatched()
.insertAll()
.execute()
}
df
.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
我知道我可以创建另一个作业来读取增量 table 的更新。但是有可能做同样的工作吗?据我所知,execute() returns Unit.
您可以在 table 上启用 Change Data Feed,然后使用另一个流或批处理作业来获取更改,这样您就能够收到有关哪些行具有 changed/deleted/inserted.它可以通过以下方式启用:
ALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
如果 thable 未注册,您可以使用路径代替 table 名称:
ALTER TABLE delta.`path` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
如果您在从 table:
读取流时添加 .option("readChangeFeed", "true")
选项,则更改将可用
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("table_name")
它会在 table 中添加三列来描述更改 - 最重要的是 _change_type
(请注意更新操作有两种不同的类型)。
如果您担心有另一个流 - 这不是问题,因为您可以 运行 同一作业中的多个流 - 您只是不需要使用 .awaitTermination
,但是类似 spark.streams.awaitAnyTermination()
的东西等待多个流。
P.S。但是,如果您解释为什么需要在同一份工作中进行更改,也许这个答案会改变?
在 Spark Streaming 作业中,有什么方法可以将 upsert using merge 之后的 updated/inserted 行获取到 Delta table?
val df = spark.readStream(...)
val deltaTable = DeltaTable.forName("...")
def upsertToDelta(events: DataFrame, batchId: Long) {
deltaTable.as("table")
.merge(
events.as("event"),
"event.entityId == table.entityId")
.whenMatched()
.updateExpr(...))
.whenNotMatched()
.insertAll()
.execute()
}
df
.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
我知道我可以创建另一个作业来读取增量 table 的更新。但是有可能做同样的工作吗?据我所知,execute() returns Unit.
您可以在 table 上启用 Change Data Feed,然后使用另一个流或批处理作业来获取更改,这样您就能够收到有关哪些行具有 changed/deleted/inserted.它可以通过以下方式启用:
ALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
如果 thable 未注册,您可以使用路径代替 table 名称:
ALTER TABLE delta.`path` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
如果您在从 table:
读取流时添加.option("readChangeFeed", "true")
选项,则更改将可用
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("table_name")
它会在 table 中添加三列来描述更改 - 最重要的是 _change_type
(请注意更新操作有两种不同的类型)。
如果您担心有另一个流 - 这不是问题,因为您可以 运行 同一作业中的多个流 - 您只是不需要使用 .awaitTermination
,但是类似 spark.streams.awaitAnyTermination()
的东西等待多个流。
P.S。但是,如果您解释为什么需要在同一份工作中进行更改,也许这个答案会改变?