Databricks - 从增量读取流 table writestream 到 orc 文件仅更改
Databricks - readstream from delta table writestream to orc file only with changes
管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。
- 我有一个每 1 小时运行一次的 Azure Databricks 笔记本作业。
此作业从 ADLS 读取 orc 文件作为结构化流(由上述管道创建的 orc 文件),然后使用合并功能根据 primaryKey 列将数据更新到增量 table。
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
- 同一台笔记本还使用读取流(结构化流)并将数据直接写入 ADLS Gen2 存储中的不同位置。这也使用启用了
checkpoint
选项的 writestream 中的 forEachBatch()
。
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
问题:
- 如果我创建新的读取流(从增量 table 读取)和写入流(写入 ORC 文件),Delta table 每 1 小时更新一次数据。
ORC 文件是否仅包含在增量 table 中合并的更改? [详情如下]
- 如果只将更改或更新的数据写入 ORC 文件,这种方法是否有任何问题?
在上面第 2 点中,不使用 readStream(从 orc 文件读取),而是使用 Delta table 路径创建一个新的 readStream,如下所示
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
- 并使用如下不同的写入流
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
如果您只是在 Delta table 上使用普通 readStream
而没有任何选项,那么您将不会获得有关更新的信息。事实上,在您设置选项 ignoreChanges
之前,更新后流将失败。这是因为 Delta 不跟踪更改,当您进行 update/delete 时,它正在重写现有文件,因此通过查看文件您只会看到数据,而不知道它是插入还是更新。
但是如果您需要从 Delta 流式传输更改,则可以使用 Delta 8.4 中引入的 Delta Change Data Feed (CDF) 功能(如果我没记错的话)。要使其正常工作,您需要通过将 属性 delta.enableChangeDataFeed
设置为 true
在源增量 table 上启用它。从那个版本开始,您将能够阅读更改提要,就像这样:
deltatbl_event_readstream = spark.readStream.format("delta")\
.option("readChangeFeed", "true") \
.option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
.load("...")
这将添加三个额外的列,描述执行的操作、Delta 版本和时间戳。如果您只需要跟踪更改,则只需要 select _change_type
列具有值 update_postimage
的行,然后您可以将该数据存储在任何您需要的地方。
但请注意,在 table 上启用 CDF 后,其他客户端(DBR < 8.4,OSS)将无法写入 table,尽管他们将继续读取数据。
我遇到了 link DATA+AI summit,其中有针对这种情况的演示。
就我而言,每个批次都有 >90% 的新行,更新较少。所以我不能使用这个选项。这可能会对其他人有所帮助。
以下类似于 Alex Ott 的回答,添加了额外的信息
根据建议,如果批量更新较多,CDF 可能无效。
- 要启用 CDF 功能:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
- 执行任何 update/insert 操作 table
- 使用
table_changes()
函数查看变化
%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
- 读取为流
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
- 创建合并更改的 upsert 函数
- 写入流以写入信息
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()
管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。
- 我有一个每 1 小时运行一次的 Azure Databricks 笔记本作业。 此作业从 ADLS 读取 orc 文件作为结构化流(由上述管道创建的 orc 文件),然后使用合并功能根据 primaryKey 列将数据更新到增量 table。
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
- 同一台笔记本还使用读取流(结构化流)并将数据直接写入 ADLS Gen2 存储中的不同位置。这也使用启用了
checkpoint
选项的 writestream 中的forEachBatch()
。
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
问题:
- 如果我创建新的读取流(从增量 table 读取)和写入流(写入 ORC 文件),Delta table 每 1 小时更新一次数据。 ORC 文件是否仅包含在增量 table 中合并的更改? [详情如下]
- 如果只将更改或更新的数据写入 ORC 文件,这种方法是否有任何问题?
在上面第 2 点中,不使用 readStream(从 orc 文件读取),而是使用 Delta table 路径创建一个新的 readStream,如下所示
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
- 并使用如下不同的写入流
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
如果您只是在 Delta table 上使用普通 readStream
而没有任何选项,那么您将不会获得有关更新的信息。事实上,在您设置选项 ignoreChanges
之前,更新后流将失败。这是因为 Delta 不跟踪更改,当您进行 update/delete 时,它正在重写现有文件,因此通过查看文件您只会看到数据,而不知道它是插入还是更新。
但是如果您需要从 Delta 流式传输更改,则可以使用 Delta 8.4 中引入的 Delta Change Data Feed (CDF) 功能(如果我没记错的话)。要使其正常工作,您需要通过将 属性 delta.enableChangeDataFeed
设置为 true
在源增量 table 上启用它。从那个版本开始,您将能够阅读更改提要,就像这样:
deltatbl_event_readstream = spark.readStream.format("delta")\
.option("readChangeFeed", "true") \
.option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
.load("...")
这将添加三个额外的列,描述执行的操作、Delta 版本和时间戳。如果您只需要跟踪更改,则只需要 select _change_type
列具有值 update_postimage
的行,然后您可以将该数据存储在任何您需要的地方。
但请注意,在 table 上启用 CDF 后,其他客户端(DBR < 8.4,OSS)将无法写入 table,尽管他们将继续读取数据。
我遇到了 link DATA+AI summit,其中有针对这种情况的演示。
就我而言,每个批次都有 >90% 的新行,更新较少。所以我不能使用这个选项。这可能会对其他人有所帮助。
以下类似于 Alex Ott 的回答,添加了额外的信息
根据建议,如果批量更新较多,CDF 可能无效。
- 要启用 CDF 功能:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
- 执行任何 update/insert 操作 table
- 使用
table_changes()
函数查看变化
%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
- 读取为流
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
- 创建合并更改的 upsert 函数
- 写入流以写入信息
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()