Spring 云流kafka消费者错误处理和重试问题
Spring cloud stream kafka consumer error handling and retries issues
我在 spring 云流 kafka 活页夹的错误处理场景中需要帮助。我的应用程序有 java 8 个消费者,其绑定在 application.yaml 中指定。消费者写成:
@Bean
public Consumer<Message<Transaction>> doProcess() {
return message -> {
Transaction transaction = message.getPayload();
if(true) {
throw new RuntimeException("exception!! !!:)");
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,
Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
application.yaml:
spring.application.name: appname
spring.cloud.stream:
function.definition: doProcess
kafka:
default.consumer:
startOffset: latest
useNativeDecoding: true
bindings:
input.consumer.autoCommitOffset: false
bindings:
doProcess-in-0:
destination: kafka.input.topic.name
group: appGroup
content-type: application/*+avro
consumer:
autoCommitOffset: false.
现在,我正在努力处理错误并遇到两个问题:
我正在尝试手动确认消息的消耗,而不是将 autoCommitOffset 设为 true。因此,当我将 autoCommitOffset 设置为 false 并测试错误场景时,面对奇怪的行为,每当抛出异常时,消息都会重试 'n' 次,并且即使在之后也会重试/重新传递失败的消息重新启动服务(如果重新启动是在 n 重试完成之前完成的)。一旦重试完成,即使在服务重新启动后也不会选择消息。这是否意味着,消费者在消息的 n re-trial/re-delivery 之后提交偏移量,这不应该是因为 autoCommitOffset 为 false。
注意:我没有配置任何dlq。
我们需要编写自定义异常处理程序,我们可以在其中捕获异常(应用程序代码和框架中的错误)并通过 AWS 环境中的电子邮件向用户组发送通知。但是,我们无法找到任何可以捕获这两种类型异常的错误处理程序。类似于扩展 SeekToCurrentErrorHandler 或任何其他可以在错误事件上调用的侦听器。
编辑:
根据 Gary 提供的解决方案,我们可以使用以下 bean 来配置自定义错误处理程序:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
return new ListenerContainerCustomizerCustom ();
}
private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
}
}
侦听器容器中默认的错误处理程序会重试10次,然后记录错误并丢弃记录;对于不同的行为,您需要配置自定义错误处理程序和恢复策略。使用 ListenerContainerCustomizer
bean 来配置容器。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
和 https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
(3.2 及更高版本)
或https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current
和 https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-letters
对于早期版本。
我在 spring 云流 kafka 活页夹的错误处理场景中需要帮助。我的应用程序有 java 8 个消费者,其绑定在 application.yaml 中指定。消费者写成:
@Bean
public Consumer<Message<Transaction>> doProcess() {
return message -> {
Transaction transaction = message.getPayload();
if(true) {
throw new RuntimeException("exception!! !!:)");
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,
Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
application.yaml:
spring.application.name: appname
spring.cloud.stream:
function.definition: doProcess
kafka:
default.consumer:
startOffset: latest
useNativeDecoding: true
bindings:
input.consumer.autoCommitOffset: false
bindings:
doProcess-in-0:
destination: kafka.input.topic.name
group: appGroup
content-type: application/*+avro
consumer:
autoCommitOffset: false.
现在,我正在努力处理错误并遇到两个问题:
我正在尝试手动确认消息的消耗,而不是将 autoCommitOffset 设为 true。因此,当我将 autoCommitOffset 设置为 false 并测试错误场景时,面对奇怪的行为,每当抛出异常时,消息都会重试 'n' 次,并且即使在之后也会重试/重新传递失败的消息重新启动服务(如果重新启动是在 n 重试完成之前完成的)。一旦重试完成,即使在服务重新启动后也不会选择消息。这是否意味着,消费者在消息的 n re-trial/re-delivery 之后提交偏移量,这不应该是因为 autoCommitOffset 为 false。
注意:我没有配置任何dlq。
我们需要编写自定义异常处理程序,我们可以在其中捕获异常(应用程序代码和框架中的错误)并通过 AWS 环境中的电子邮件向用户组发送通知。但是,我们无法找到任何可以捕获这两种类型异常的错误处理程序。类似于扩展 SeekToCurrentErrorHandler 或任何其他可以在错误事件上调用的侦听器。
编辑:
根据 Gary 提供的解决方案,我们可以使用以下 bean 来配置自定义错误处理程序:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
return new ListenerContainerCustomizerCustom ();
}
private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
}
}
侦听器容器中默认的错误处理程序会重试10次,然后记录错误并丢弃记录;对于不同的行为,您需要配置自定义错误处理程序和恢复策略。使用 ListenerContainerCustomizer
bean 来配置容器。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh 和 https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
(3.2 及更高版本)
或https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current 和 https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-letters
对于早期版本。