结构化流输出 - 使用 OPTIMIZE 压缩而不破坏传出读取流顺序保证

Structured streaming output - compacting with OPTIMIZE without breaking outgoing read stream order guarantees

我有一个来自 kafka 使用结构化流的传入“仅附加”更新流。使用 foreachBatch 并在其中写入:

parsedDf \
    .select("parsedId", "ingestionDate","parsedValue.after", "parsedValue.patch", "parsedValue.op", "parsedvalue.ts_ms",  'partition', 'offset') \
    .write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(f"/mnt/defaultDatalake/{append_table_name}")

稍后,在下游作业中,我有一个 readStream 读取在那里创建的文件。

问题 - 这些作业创建了大量文件,因为主题不是很完整。虽然下游作业对此很好(从流的末尾读取),但我还需要直接查询此数据(追加 table),但由于涉及的文件数量很多,因此查询非常长。

当然,我已经尝试在此存储上使用 OPTIMIZE,但似乎破坏了使用这些文件的 readStream 的顺序保证。

所以我需要的是 - 一种将小文件滚动到更大文件(比方说 - 超过一周)的方法,而不会破坏对下游消费者的严格订单保证(即使它需要重新读取早期的数据) )

Spark 3。运行 数据块 7.5

Databrick 的增量保证

OPTIMIZE guarantees:

Performing OPTIMIZE on a table that is a streaming source does not affect 
any current or future streams that treat this table as a source

所以这要么是 Delta 的 OPTIMIZE 中的错误,要么是您的代码中的错误。 关于 OPTIMIZE 我不能说什么 - 它不是开源的。

建议

compaction的手动方式: (我是 运行 我项目中以下内容的更复杂版本)尝试遵循。请注意,dataChange 选项对于将 delta 接收器用作流媒体源非常重要。

spark.read \
        .format("delta") \
        .load(root_path) \
        .write \
        .format("delta") \
        .option("dataChange", "false") \
        .mode("overwrite") \
        .save(root_path)

分区压缩,根据 https://mungingdata.com/delta-lake/compact-small-files/,加上我的 dataChange:

spark.read\
  .format("delta")\
  .load(table)\
  .where(partition)\
  .repartition(numFiles)\
  .write\
  .format("delta")\
  .option("dataChange", "false") \
  .mode("overwrite")\
  .option("replaceWhere", partition)\
  .save(table) 

备注

请注意,通过 S3 处理多个并发写入作业是 not supported。这可能是问题的来源之一。