结构化流 startingOffest 和 Checkpoint

Structured Streaming startingOffest and Checkpoint

我对结构化流中的 startingOffsets 感到困惑。 在官方文档here中,它说查询类型

  1. 串流 - 这是连续串流吗?
  2. Batch - 这是用于使用 forEachBatch 或触发器进行查询吗? (不允许最新)

我的工作流程也启用了 checkpoints。这如何与 startingOffsets 一起工作? 如果我的工作流程崩溃并且我有 startingOffsets 作为 latest,spark 是否检查 kafka 偏移量或 spark 检查点偏移量或两者?

Streaming 在 Spark 中默认表示“微批处理”。根据您设置的触发器,它将以给定的频率检查新数据的来源。您可以通过

使用它
val df = spark
  .readStream
  .format("kafka")
  .[...]

对于 Kafka,还有实验性的 continuous 触发器,它允许以相当低的延迟处理数据。请参阅文档中的 Continuous Processing 部分。

另一方面,

Batch 就像读取一次文本文件(例如 csv)一样。您可以使用

val df = spark
  .read
  .format("kafka")
  .[...]

请注意流处理 readStream 和批处理 read 的区别。在批处理模式下,startingOffset 只能设置为 earliest,即使您使用检查点,它也将始终从最早的偏移量开始,以防计划内或计划外的重启。

结构化流中的检查点需要在 writeStream 部分中设置,并且每个查询都必须是唯一的(以防您 运行 来自同一来源的多个流式查询)。如果您设置了该检查点位置并重新启动您的应用程序,Spark 将只查看这些检查点文件。只有当查询第一次开始时,它才会检查 startingOffset 选项。

请记住,结构化流 永远不会 将任何偏移量提交回 Kafka。它只依赖于它的检查点文件。请参阅我在 How to manually set group.id and commit kafka offsets in spark structured streaming? 上的其他回答。

如果您计划 运行 您的应用程序,例如,一天一次,那么最好使用 readStream 并启用检查点和触发器 writeStream.trigger(Trigger.Once)Running Streaming Jobs Once a Day For 10x Cost Savings.

上的 Databricks 博客对此方法给出了很好的解释