Spring-kafka 错误处理与 DeadLetterPublishingRecoverer
Spring-kafka error handling with DeadLetterPublishingRecoverer
我正在尝试在 Spring boot kafa 中实现错误处理。在我的 Kafka 侦听器中,我抛出了一个运行时异常,如下所示:
@KafkaListener(topics= "Kafka-springboot-example", groupId="group-employee-json")
public void consumeEmployeeJson(Employee employee) {
logger.info("Consumed Employee JSON: "+ employee);
if(null==employee.getEmployeeId()) {
throw new RuntimeException("failed");
//throw new ListenerExecutionFailedException("failed");
}
}
并且我已经按照以下配置了错误处理:
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template){
ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template)));
return factory;
}
}
我的 DLT 监听器如下:
@KafkaListener(topics= "Kafka-springboot-example.DLT", groupId="group-employee-json")
public void consumeEmployeeErrorJson(Employee employee) {
logger.info("Consumed Employee JSON frpm DLT topic: "+ employee);
}
但是我的消息没有发布到 DLT 主题。
知道我做错了什么吗?
已编辑:
application.properties
server.port=8088
#kafka-producer-config
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-employee-json
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
如果容器工厂使用非标准的bean名称,需要在containerFactory
@KafkaListener
属性.
中设置
默认的 bean 名称是 kafkaListenerContainerFactory
,这是由 Boot 自动配置的。您需要覆盖该 bean 或将侦听器配置为指向您的非标准 bean 名称。
我正在尝试在 Spring boot kafa 中实现错误处理。在我的 Kafka 侦听器中,我抛出了一个运行时异常,如下所示:
@KafkaListener(topics= "Kafka-springboot-example", groupId="group-employee-json")
public void consumeEmployeeJson(Employee employee) {
logger.info("Consumed Employee JSON: "+ employee);
if(null==employee.getEmployeeId()) {
throw new RuntimeException("failed");
//throw new ListenerExecutionFailedException("failed");
}
}
并且我已经按照以下配置了错误处理:
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template){
ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template)));
return factory;
}
}
我的 DLT 监听器如下:
@KafkaListener(topics= "Kafka-springboot-example.DLT", groupId="group-employee-json")
public void consumeEmployeeErrorJson(Employee employee) {
logger.info("Consumed Employee JSON frpm DLT topic: "+ employee);
}
但是我的消息没有发布到 DLT 主题。
知道我做错了什么吗?
已编辑:
application.properties
server.port=8088
#kafka-producer-config
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-employee-json
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
如果容器工厂使用非标准的bean名称,需要在containerFactory
@KafkaListener
属性.
默认的 bean 名称是 kafkaListenerContainerFactory
,这是由 Boot 自动配置的。您需要覆盖该 bean 或将侦听器配置为指向您的非标准 bean 名称。