Kafka Consumes unprocessable messages - 如何在以后重新处理损坏的消息?
Kafka Consumes unprocessable messages - How to reprocess broken messages later?
我们正在使用 Spring Kafka 实现 Kafka 消费者。据我正确理解,如果单个消息的处理失败,可以选择
- 不关心,只是 ACK
- 使用
RetryTemplate
进行一些重试处理
- 如果这仍然不起作用,请使用
RecoveryCallback
进行一些自定义故障处理
我想知道您对此的最佳做法是什么。我想到了简单的应用程序异常,例如 DeserializationException(针对 JSON 格式的消息)或更长的本地存储停机时间等。这意味着需要一些额外的工作,例如修补程序部署,以修复损坏的应用程序以便能够重新处理错误消息。
由于丢失消息(即不处理它们)对我们来说不是一个选择,剩下的唯一选择是 IMO 将错误消息存储在某个持久性存储中,例如。 G。另一个 "faulty messages" Kafka 主题,以便稍后可以再次处理这些事件,而无需完全停止事件处理。
你如何处理这些情况?
一个例子是Spring Cloud Stream,它可以配置为将失败的消息发布到另一个主题errors.foo;然后用户可以将它们复制回原始主题,稍后再试。
这个逻辑在恢复回调中完成。
我们有一个用例,我们根本无法丢弃任何消息,即使是错误消息也是如此。所以当我们遇到错误的消息时,我们会发送一条默认消息来代替那条错误的记录,同时将消息发送到失败的主题以便稍后重试。
我们正在使用 Spring Kafka 实现 Kafka 消费者。据我正确理解,如果单个消息的处理失败,可以选择
- 不关心,只是 ACK
- 使用
RetryTemplate
进行一些重试处理
- 如果这仍然不起作用,请使用
RecoveryCallback
进行一些自定义故障处理
我想知道您对此的最佳做法是什么。我想到了简单的应用程序异常,例如 DeserializationException(针对 JSON 格式的消息)或更长的本地存储停机时间等。这意味着需要一些额外的工作,例如修补程序部署,以修复损坏的应用程序以便能够重新处理错误消息。
由于丢失消息(即不处理它们)对我们来说不是一个选择,剩下的唯一选择是 IMO 将错误消息存储在某个持久性存储中,例如。 G。另一个 "faulty messages" Kafka 主题,以便稍后可以再次处理这些事件,而无需完全停止事件处理。
你如何处理这些情况?
一个例子是Spring Cloud Stream,它可以配置为将失败的消息发布到另一个主题errors.foo;然后用户可以将它们复制回原始主题,稍后再试。
这个逻辑在恢复回调中完成。
我们有一个用例,我们根本无法丢弃任何消息,即使是错误消息也是如此。所以当我们遇到错误的消息时,我们会发送一条默认消息来代替那条错误的记录,同时将消息发送到失败的主题以便稍后重试。