在处理来自 Kafka 的消息时避免数据丢失

Avoid Data Loss While Processing Messages from Kafka

寻找设计我的 Kafka 消费者的最佳方法。基本上我想看看避免数据丢失的最佳方法是什么 exception/errors 正在处理消息。

我的用例如下。

a) 我使用 SERVICE 来处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,它将在一天结束时 运行 尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。

b) 我想确保消息丢失为零,因此我会将消息保存到文件中,以防在将消息保存到数据库时出现任何问题。

c) 在生产环境中,可以有多个消费者和服务实例 运行ning,因此很可能有多个应用程序尝试写入 同一个文件。

Q-1) 写入文件是避免数据丢失的唯一选择吗?

Q-2) 如果是唯一选项,如何保证多个应用程序同时写入同一个文件并同时读取?以后请考虑错误处理器 正在构建,它可能正在从同一个文件读取消息,而另一个应用程序正在尝试写入该文件。

ERROR PROCESSOR - 我们的来源遵循事件驱动机制,有时依赖事件(例如,某物的父实体)很可能会延迟几天。所以在那种情况下,我希望我的 ERROR PROCESSOR 多次处理相同的消息。

我以前 运行 也有过类似的经历。因此,直接进入您的问题:

  • 不一定,您也许可以在新主题中将这些消息发送回 Kafka(比方说 - error-topic)。因此,当您的错误处理器准备就绪时,它可以只监听 this error-topic 并在消息传入时使用它们。

  • 我认为这个问题已经在回答第一个问题时得到了解决。因此,与其使用一个文件读写并打开多个文件句柄来同时执行此操作,Kafka 可能是更好的选择,因为它专为此类问题而设计。

注意:以下几点只是基于我对你的问题领域的有限理解的一些思考。因此,您可以选择安全地忽略它。

service 组件的设计中还有一点值得考虑——您不妨考虑通过将所有错误消息发送回 Kafka 来合并第 4 点和第 5 点。这将使您能够以一致的方式处理所有错误消息,而不是将一些消息放在错误数据库中,而另一些消息放在 Kafka 中。

编辑:根据有关 ERROR PROCESSOR 要求的附加信息,这里是解决方案设计的示意图。

我现在故意保留 ERROR PROCESSOR 的输出抽象只是为了让它通用。

希望对您有所帮助!

如果您在写入数据库之前不提交消费的消息,那么在 Kafka 保留消息的同时不会丢失任何内容。这样做的权衡是,如果消费者确实提交给数据库,但 Kafka 偏移量提交失败或超时,您最终将再次使用记录并可能在您的服务中处理重复项。

即使您确实写入了一个文件,您也不会被 gua运行teed 排序,除非您为每个分区打开一个文件,并确保所有消费者仅 运行 在一台机器上(因为你在那里保留状态,而不是 fault-tolerant)。重复数据删除仍然需要处理。

此外,您可以查看 Kafka Connect 框架,而不是将您自己的使用者写入数据库。为了验证消息,您可以类似地部署 Kafka Streams 应用程序以从输入主题中过滤出不良消息,将其发送到主题中以发送到数据库