如何使用 spring kafka 批处理侦听器进行有状态重试
How to do Stateful retry with spring kafka batch listener
我正在阅读此处的文档https://docs.spring.io/spring-kafka/docs/2.2.6.RELEASE/reference/html/#retrying-deliveries,但我无法弄清楚使用批处理侦听器实现有状态重试的正确方法
文档说 "retry adapter is not provided for batch message listeners because the framework has no knowledge of where in a batch the failure occurred".
这对我的用例来说不是问题,因为我只想重试整个批次。
文档建议我在侦听器本身中使用 RetryTemplate。好的,我可以做到。
问题出现在下一节,其中讨论了使用有状态重试标志在重试之间进行消费者轮询,以防止代理丢弃我的消费者。
如何配置批处理侦听器来执行此操作?批处理侦听器是否支持状态重试标志?如果我的重试逻辑在侦听器本身内,那不会阻止轮询吗? statefulRetry 标志到底做了什么?
否;您不能将 RetryTemplate
添加到批处理侦听器的容器工厂。
java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
We will clean that up with a more meaningful error..
随着即将发布的 2.3 版本(目前下周五发布的候选版本)you can add a BackOff
to the SeekToCurrentErrorHandler
,它提供了与 RetryTemplate
类似的功能;对于当前版本,将立即尝试重新交付。
此外,另一个新功能 recently merged 提供了一种从特定索引批量重试的机制。
最新版本的springkafka有一个特殊的RetryingBatchErrorHandler。 https://docs.spring.io/spring-kafka/docs/2.4.6.RELEASE/reference/html/#retrying-batch-eh 谢谢,Spring Kafka 团队!
我正在阅读此处的文档https://docs.spring.io/spring-kafka/docs/2.2.6.RELEASE/reference/html/#retrying-deliveries,但我无法弄清楚使用批处理侦听器实现有状态重试的正确方法
文档说 "retry adapter is not provided for batch message listeners because the framework has no knowledge of where in a batch the failure occurred".
这对我的用例来说不是问题,因为我只想重试整个批次。
文档建议我在侦听器本身中使用 RetryTemplate。好的,我可以做到。
问题出现在下一节,其中讨论了使用有状态重试标志在重试之间进行消费者轮询,以防止代理丢弃我的消费者。
如何配置批处理侦听器来执行此操作?批处理侦听器是否支持状态重试标志?如果我的重试逻辑在侦听器本身内,那不会阻止轮询吗? statefulRetry 标志到底做了什么?
否;您不能将 RetryTemplate
添加到批处理侦听器的容器工厂。
java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
We will clean that up with a more meaningful error..
随着即将发布的 2.3 版本(目前下周五发布的候选版本)you can add a BackOff
to the SeekToCurrentErrorHandler
,它提供了与 RetryTemplate
类似的功能;对于当前版本,将立即尝试重新交付。
此外,另一个新功能 recently merged 提供了一种从特定索引批量重试的机制。
最新版本的springkafka有一个特殊的RetryingBatchErrorHandler。 https://docs.spring.io/spring-kafka/docs/2.4.6.RELEASE/reference/html/#retrying-batch-eh 谢谢,Spring Kafka 团队!