Kafka Streams:如何确保在处理完成后提交偏移量

Kafka Streams: How to ensure offset is committed after processing is completed

我想使用 Kafka 流处理 Kafka 主题中存在的消息。

处理的最后一步是将结果放入数据库table。为避免数据库争用相关问题(程序将 运行 24*7 处理数百万条消息),我将对 JDBC 调用使用批处理。

但在这种情况下,有可能会丢失消息(在一个场景中,我从一个主题中读取了 500 条消息,流将标记偏移量,现在程序失败。消息出现在 JDBC 批次中更新丢失但偏移量被标记为这些消息)。

我想在数据库 insert/update 完成后手动标记最后一条消息的偏移量,但是根据以下问题是不可能的:How to commit manually with Kafka Stream?.

有人可以提出任何可能的解决方案吗

Kafka Stream不支持手动提交,同时也不支持批处理。 关于您的用例,可能性很小:

  1. 使用Normal consumer并实现批处理和控制手动偏移。

  2. 按照以下方式使用 Spark Kafka 结构化流 Kafka Spark Structured Stream

  3. 试试Spring卡夫卡 [Spring Kafka]2

  4. 在这种情况下,也可以考虑 JDBC Kafka 连接器。 Kafka JDBC Connector

正如@sun007 的回答中提到的,我宁愿稍微改变一下你的方法:

  • 使用Kafka Streams处理输入数据。让 Kafka Streams 应用程序将其输出写入 Kafka,而不是关系数据库。
  • 使用 Kafka Connect(例如,即用型 JDBC 连接器)将数据从 Kafka 摄取到关系数据库。根据需要配置和调整连接器,例如用于批量插入数据库。

这种 processing (Kafka Streams) 和 ingestion (Kafka Connect) 的解耦通常是更可取的设计。例如,您不再将处理步骤与数据库的可用性相结合:如果数据库关闭,您的 KStreams 应用程序为什么要停止?这是一个与处理逻辑无关的操作问题,您当然不想处理超时、重试等问题。 (即使你使用 Kafka Streams 以外的工具进行处理,这种解耦仍然是一个更可取的设置。)