如何在使用合并更新插入后从 Delta table 获取 new/updated 记录?

How to get new/updated records from Delta table after upsert using merge?

在 Spark Streaming 作业中,有什么方法可以将 upsert using merge 之后的 updated/inserted 行获取到 Delta table?


val df = spark.readStream(...)
val deltaTable = DeltaTable.forName("...")


def upsertToDelta(events: DataFrame, batchId: Long) {

deltaTable.as("table")
    .merge(
      events.as("event"), 
      "event.entityId == table.entityId")
    .whenMatched()
        .updateExpr(...))
    .whenNotMatched()
      .insertAll()
    .execute()
}

df
  .writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

我知道我可以创建另一个作业来读取增量 table 的更新。但是有可能做同样的工作吗?据我所知,execute() returns Unit.

您可以在 table 上启用 Change Data Feed,然后使用另一个流或批处理作业来获取更改,这样您就能够收到有关哪些行具有 changed/deleted/inserted.它可以通过以下方式启用:

ALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

如果 thable 未注册,您可以使用路径代替 table 名称:

ALTER TABLE delta.`path` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

如果您在从 table:

读取流时添加 .option("readChangeFeed", "true") 选项,则更改将可用
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("table_name")

它会在 table 中添加三列来描述更改 - 最重要的是 _change_type(请注意更新操作有两种不同的类型)。

如果您担心有另一个流 - 这不是问题,因为您可以 运行 同一作业中的多个流 - 您只是不需要使用 .awaitTermination,但是类似 spark.streams.awaitAnyTermination() 的东西等待多个流。

P.S。但是,如果您解释为什么需要在同一份工作中进行更改,也许这个答案会改变?