Spring Kafka:将 KafkaListenerErrorHandler 应用于所有 KafkaListener
Spring Kafka: applying KafkaListenerErrorHandler to all KafkaListener
我想拦截我的@KafkaListener 抛出的验证错误 (MethodArgumentNotValidException)。为了实现这一点,我创建了实现 KafkaListenerErrorHandler:
的 DefaultKafkaValidationErrorHandler
@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//here is some logic
}
@Override
public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
//such as here
}
}
并为注册商设置一个验证器:
@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
当我将这个处理程序直接放到我的侦听器时它工作正常
@KafkaListener(
containerFactory = "factory",
topics = "topic",
groupId = "group.id",
errorHandler = "defaultKafkaValidationErrorHandler")
我的问题是,有没有办法将我的 DefaultKafkaValidationErrorHandler 应用到我项目中的所有 @KafkaListener,而不是像上面提到的那样直接应用。也许通过 ConcurrentKafkaListenerContainerFactory、KafkaListenerEndpointRegistrar 或其他方式
您可以覆盖 ConcurrentKafkaListenerContainerFactory
的 C createListenerContainer(KafkaListenerEndpoint endpoint);
方法并将全局 KafkaListenerErrorHandler
设置为所有单个 endpoint
。您的工厂必须在 KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME
bean 名称下注册,以避免在每个 @KafkaListener
.
上配置该工厂
我想拦截我的@KafkaListener 抛出的验证错误 (MethodArgumentNotValidException)。为了实现这一点,我创建了实现 KafkaListenerErrorHandler:
的 DefaultKafkaValidationErrorHandler@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//here is some logic
}
@Override
public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
//such as here
}
}
并为注册商设置一个验证器:
@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
当我将这个处理程序直接放到我的侦听器时它工作正常
@KafkaListener(
containerFactory = "factory",
topics = "topic",
groupId = "group.id",
errorHandler = "defaultKafkaValidationErrorHandler")
我的问题是,有没有办法将我的 DefaultKafkaValidationErrorHandler 应用到我项目中的所有 @KafkaListener,而不是像上面提到的那样直接应用。也许通过 ConcurrentKafkaListenerContainerFactory、KafkaListenerEndpointRegistrar 或其他方式
您可以覆盖 ConcurrentKafkaListenerContainerFactory
的 C createListenerContainer(KafkaListenerEndpoint endpoint);
方法并将全局 KafkaListenerErrorHandler
设置为所有单个 endpoint
。您的工厂必须在 KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME
bean 名称下注册,以避免在每个 @KafkaListener
.