spring云流kafka binder重试
spring cloud stream kafka binder retry
我有 spring 云流应用程序,它带有消费和发送消息的 kafka 活页夹。
在应用程序中,我使用重试策略配置自定义错误处理程序,并向处理程序添加不可重试的异常。配置示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
SeekToCurrentErrorHandler customErrorHandler
) {
return (((container, destinationName, group) -> {
container.setErrorHandler(customErrorHandler);
}));
}
@Bean
public SeekToCurrentErrorHandler customErrorHandler() {
var errorHandler = new SeekToCurrentErrorHandler(
(consumerRecord, e) -> log.error("Got exception skip record record: {}", consumerRecord, e),
new FixedBackOff(1000L, 10)
);
errorHandler.addNotRetryableException(App.MyCustomException.class);
return errorHandler;
}
但我知道,如果抛出异常,应用程序会重试处理消息 3 次。
预期行为 - 如果 App.MyCustomException.class 抛出,将不会重复使用消息。
如何为 spring 云流 kafka 活页夹应用程序配置重试策略?
此处的代码示例:github
运行 测试重现问题。
您提供的自定义是针对 container-level 错误处理程序的。 Binder 有不同的重试机制。您可以在您的配置中添加以下内容,以确保发生异常时记录不是re-tried。
spring.cloud.stream:
bindings:
processor-in-0:
...
consumer:
retryableExceptions:
ru.vichukano.kafka.binder.retry.App.MyCustomException: false
当我尝试这样做时,我没有看到消息 re-delivered。
这里有一些explanations。
我有 spring 云流应用程序,它带有消费和发送消息的 kafka 活页夹。 在应用程序中,我使用重试策略配置自定义错误处理程序,并向处理程序添加不可重试的异常。配置示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
SeekToCurrentErrorHandler customErrorHandler
) {
return (((container, destinationName, group) -> {
container.setErrorHandler(customErrorHandler);
}));
}
@Bean
public SeekToCurrentErrorHandler customErrorHandler() {
var errorHandler = new SeekToCurrentErrorHandler(
(consumerRecord, e) -> log.error("Got exception skip record record: {}", consumerRecord, e),
new FixedBackOff(1000L, 10)
);
errorHandler.addNotRetryableException(App.MyCustomException.class);
return errorHandler;
}
但我知道,如果抛出异常,应用程序会重试处理消息 3 次。 预期行为 - 如果 App.MyCustomException.class 抛出,将不会重复使用消息。 如何为 spring 云流 kafka 活页夹应用程序配置重试策略?
此处的代码示例:github
运行 测试重现问题。
您提供的自定义是针对 container-level 错误处理程序的。 Binder 有不同的重试机制。您可以在您的配置中添加以下内容,以确保发生异常时记录不是re-tried。
spring.cloud.stream:
bindings:
processor-in-0:
...
consumer:
retryableExceptions:
ru.vichukano.kafka.binder.retry.App.MyCustomException: false
当我尝试这样做时,我没有看到消息 re-delivered。
这里有一些explanations。