Databricks - 从增量读取流 table writestream 到 orc 文件仅更改

Databricks - readstream from delta table writestream to orc file only with changes

管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。

  1. 我有一个每 1 小时运行一次的 Azure Databricks 笔记本作业。 此作业从 ADLS 读取 orc 文件作为结构化流(由上述管道创建的 orc 文件),然后使用合并功能根据 primaryKey 列将数据更新到增量 table。
event_readstream = ( 
    spark.readStream
    .format("orc")
    .schema('my-schema.json')
    .load('/mnt/path/from/adls/to/orc/files/')
  )
  ...

def upsertEventToDeltaTable(df, batch_id): 
  input_df = (
    df
    .drop('address_line3')
    .dropDuplicates(['primaryKey'])
    .withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
  )
  
  input_df.createOrReplaceTempView("event_df_updates")
  
  input_df._jdf.sparkSession().sql("""
    MERGE INTO events dbEvent
    USING event_df_updates dfEvent
    ON dbEvent.primaryKey = dfEvent.primaryKey
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
                             """)

event_writestream = ( 
    event_readStream
    .drop('adddress_line3')  #some column to be dropped
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
    .foreachBatch(upsertEventToDeltaTable)
    .start()
  )
  1. 同一台笔记本还使用读取流(结构化流)并将数据直接写入 ADLS Gen2 存储中的不同位置。这也使用启用了 checkpoint 选项的 writestream 中的 forEachBatch()

def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

location_writestream = ( 
  event_readstream   # Same read stream is used here 
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)
  .start()
)

问题:

在上面第 2 点中,不使用 readStream(从 orc 文件读取),而是使用 Delta table 路径创建一个新的 readStream,如下所示

deltatbl_event_readstream = spark.readStream.format("delta")
  .load("/mnt/delta/myadlsaccnt/user_events")  # my delta table location
def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

deltatbl_event_readstream
   .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)  # re-using the same method.
  .start()

如果您只是在 Delta table 上使用普通 readStream 而没有任何选项,那么您将不会获得有关更新的信息。事实上,在您设置选项 ignoreChanges 之前,更新后流将失败。这是因为 Delta 不跟踪更改,当您进行 update/delete 时,它正在重写现有文件,因此通过查看文件您只会看到数据,而不知道它是插入还是更新。

但是如果您需要从 Delta 流式传输更改,则可以使用 Delta 8.4 中引入的 Delta Change Data Feed (CDF) 功能(如果我没记错的话)。要使其正常工作,您需要通过将 属性 delta.enableChangeDataFeed 设置为 true 在源增量 table 上启用它。从那个版本开始,您将能够阅读更改提要,就像这样:

deltatbl_event_readstream = spark.readStream.format("delta")\
  .option("readChangeFeed", "true") \
  .option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
  .load("...")

这将添加三个额外的列,描述执行的操作、Delta 版本和时间戳。如果您只需要跟踪更改,则只需要 select _change_type 列具有值 update_postimage 的行,然后您可以将该数据存储在任何您需要的地方。

但请注意,在 table 上启用 CDF 后,其他客户端(DBR < 8.4,OSS)将无法写入 table,尽管他们将继续读取数据。

我遇到了 link DATA+AI summit,其中有针对这种情况的演示。

就我而言,每个批次都有 >90% 的新行,更新较少。所以我不能使用这个选项。这可能会对其他人有所帮助。

以下类似于 Alex Ott 的回答,添加了额外的信息

根据建议,如果批量更新较多,CDF 可能无效。

  • 要启用 CDF 功能:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • 执行任何 update/insert 操作 table
  • 使用table_changes()函数查看变化
%sql 
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
  • 读取为流
event_read_stream = spark.readStream
     .format("delta")
     .option("readChangeFeed", "true")
     .option("startingVersion", "latest")
     .table("event") #// table name 
     .filter("_change_type != 'update_preimage'")
  • 创建合并更改的 upsert 函数
  • 写入流以写入信息
event_read_stream.writeStream.format("delta")
  .trigger(processingTime = "2 seconds") # if in case if job use once
  .outputMode("append")
  .start()