spring-cloud-stream kafka错误处理

spring-cloud-stream kafka error handling

我浏览了 spring-cloud-stream 1.0.0.RELEASE 的文档,似乎找不到任何关于错误处理的文档。

根据对 kafka 0.9 的观察,如果我的消费者抛出 RuntimeException,我会看到 3 次重试。 3 次重试后,我在日志中看到:

2016-05-17 09:35:59.216 ERROR 8983 --- [  kafka-binder-] o.s.i.k.listener.LoggingErrorHandler     : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message

此时消费者offset滞后1,如果我重启消费者,消息又重试了3次。但是,如果我随后向同一个分区发送另一条消息,这样消费者就不会抛出异常,消费者偏移量就会更新,并且我们抛出异常的原始消息在重启后将不再重试。

是否有我没有找到的记录?错误处理是特定于活页夹的,还是 s-c-s 将其抽象为在活页夹之间保持一致?我怀疑这是使用 kafka 活页夹更新消费者偏移量的意外结果。我看到添加了一个 enableDlq kafka consumer 属性,我正准备对其进行测试,但我不确定我们如何处理 kafka 中的死信。我熟悉 rabbitmq 中的死信队列,但是对于 rabbitmq,我们能够使用 rabbitmq shovel 插件重新发布和重试 dlq 消息,以涵盖由于临时服务中断而导致失败的情况。我不知道有任何类似的功能可用于 kafka,除了我们自己编写类似的实用程序之外。

更新:启用 enableDlq kafka 消费者 属性 的测试显示了与错误处理相同的消费者偏移问题。当消费者抛出 RuntimeException 时,我看到 3 次重试,之后未记录错误消息,并且我看到一条消息发布到 error.<destination>.<group> 记录,但消费者偏移量未更新且滞后 1。如果我重启消费者,它尝试再次处理来自原始主题分区的相同失败消息,重试 3 次并再次将相同消息放入 error.<destination>.<group> 主题(重复的 dlq 消息)。如果我将另一条消息发布到同一主题分区,而消费者未为其抛出 RuntimeException,则偏移量将更新,并且原始失败的消息不再在重新启动时重试。

我认为消费者应该在消费者抛出错误时更新kafka中的消费者偏移量,无论enableDlq是否为真。这至少会使所有重试失败的消息被丢弃(当 enableDlq 为 false 时)或发布到 dlq 并且永远不会重试(当 enableDlq 为 true 时)是一致的。

在我看来像是一个错误 - 侦听器容器有一个 属性 autoCommitOnError(默认情况下为 false),它不会被活页夹公开(或设置)。在调用错误处理程序(发布错误)后,如果布尔值为真,则提交偏移量。

请在 github 中将其作为问题报告。