跟踪 Spark 结构化流中消耗的消息

Track of consumed messages in Spark structured streaming

我想设置配置,让我的应用程序跟踪来自 kafka 的消费消息。因此,无论何时失败,它都可以从上次提交或消耗的偏移量开始选择。

readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

我在网上看到 checkpointlocation 属性 可以设置,spark 可以使用它来跟踪偏移量。

想知道我可以在哪里设置这个 属性 吗?我可以在上面的代码中设置 option 吗?我可以知道如何正确设置它吗?

其次,我无法理解trigger(Trigger.Continuous("1 second")) 属性。文档说 continuous processing engine will record the progress of the query every second,它在读取来自 kafka 的消息时记录了什么样的进度?

您可以将检查点位置设置为 writeStream 中的一个选项:

[...]
.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .option("checkpointLocation", "/path/to/dir")
  .trigger(Trigger.Continuous("1 second"))
  .start();

从 Kafka 读取时跟踪进度意味着跟踪 TopicPartition 中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为 JSON 对象存储在给定路径中,例如

{
  "topic1":{
    "0":11, 
    "1":101
  }
}

这意味着应用程序已经使用了主题 topic1 的分区 0 中的偏移量 10 和分区 1 中的偏移量 100。检查点是“提前”写入的(使用预写日志),因此应用程序将继续从 Kafka 读取消息,它在有意或无意(失败)重启之前停止的地方。

Trigger.Continuous 从 Spark 版本 2.3 开始可用。并且截至目前标记为 实验性 。与 micro-batch 方法相比,它会在 Kafka 中的每条消息到达主题时立即获取它,而无需尝试将其与其他消息进行批处理。这可以改善延迟,但很可能会降低整体吞吐量。

参数(例如1 seconds)决定检查点的频率。

使用此触发模式时,重要的是至少要有与主题有分区一样多的可用内核。否则,申请将不会取得任何进展。您可以阅读更多相关信息 here:

"For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress."