Spark Structured Streaming - kafka 偏移量处理
Spark Structured Streaming - kafka offset handling
当我从最新的偏移量启动我的 Spark Structured Streaming 3.0.1 应用程序时,它运行良好。但是当我想从一些较早的偏移量开始时 - 例如:
- startingOffsets 到“最早”
- startingOffsets 到特定偏移量,例如 {"MyTopic-v1":{"0":1686734237}}
我可以在日志中看到起始偏移量被正确拾取,但是随后发生了一系列查找(从我定义的位置开始)直到它到达当前的最新偏移量。
我放弃了我的检查点目录并尝试了几个选项,但情况总是一样的 - 它报告正确的起始偏移量,但需要很长时间才能慢慢寻找最近的并开始处理 - 知道为什么我还应该检查什么?
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786737237 for partition MyTopic-v1-0
我离开应用程序 运行 更长时间,它最终开始生成文件,但是我的 100 秒处理触发器没有满足,数据出现的时间很晚 - 20-30 分钟后。
(我也在 spark 2.4.5 上测试过 - 同样的问题 - 也许是一些 kafka 配置?)
如您所示,将选项 startingOffsets
与 JSON 对象一起使用应该可以正常工作。
您观察到的是,在应用程序第一次启动时,结构化流作业将从提供的 (1686734237) 读取所有(!)偏移量,直到主题中的最后一个可用偏移量。由于这可能是相当多的消息,因此大块的处理将使第一个微批处理非常繁忙。
请记住,Trigger
选项只是定义了微批处理的触发频率。您应该确保将此触发率与预期的处理时间保持一致。我在这里基本上看到两个选项:
- 使用选项
maxOffsetsPerTriger
限制每个触发器/微批从 Kafka 获取的偏移量
- 避免使用任何触发器,因为这将允许您的流在默认情况下在前一个触发器完成数据处理后立即触发
当我从最新的偏移量启动我的 Spark Structured Streaming 3.0.1 应用程序时,它运行良好。但是当我想从一些较早的偏移量开始时 - 例如:
- startingOffsets 到“最早”
- startingOffsets 到特定偏移量,例如 {"MyTopic-v1":{"0":1686734237}}
我可以在日志中看到起始偏移量被正确拾取,但是随后发生了一系列查找(从我定义的位置开始)直到它到达当前的最新偏移量。
我放弃了我的检查点目录并尝试了几个选项,但情况总是一样的 - 它报告正确的起始偏移量,但需要很长时间才能慢慢寻找最近的并开始处理 - 知道为什么我还应该检查什么?
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786734737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786735737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786736737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO KafkaConsumer:1564 - [...] Seeking to offset 1786737237 for partition MyTopic-v1-0
我离开应用程序 运行 更长时间,它最终开始生成文件,但是我的 100 秒处理触发器没有满足,数据出现的时间很晚 - 20-30 分钟后。
(我也在 spark 2.4.5 上测试过 - 同样的问题 - 也许是一些 kafka 配置?)
如您所示,将选项 startingOffsets
与 JSON 对象一起使用应该可以正常工作。
您观察到的是,在应用程序第一次启动时,结构化流作业将从提供的 (1686734237) 读取所有(!)偏移量,直到主题中的最后一个可用偏移量。由于这可能是相当多的消息,因此大块的处理将使第一个微批处理非常繁忙。
请记住,Trigger
选项只是定义了微批处理的触发频率。您应该确保将此触发率与预期的处理时间保持一致。我在这里基本上看到两个选项:
- 使用选项
maxOffsetsPerTriger
限制每个触发器/微批从 Kafka 获取的偏移量 - 避免使用任何触发器,因为这将允许您的流在默认情况下在前一个触发器完成数据处理后立即触发