Spring 启动 Kafka 配置 DefaultErrorHandler?
Spring Boot Kafka Configure DefaultErrorHandler?
我根据 Spring Kafka 文档创建了一个批处理消费者:
@SpringBootApplication
public class ApplicationConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
private static final String TOPIC = "foo";
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(topics = TOPIC)
public void listen(List<Name> ps) {
LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
}
}
我能够通过定义以下附加配置环境变量成功获得消费者 运行,Spring 会自动获取:
export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...
所以上面的代码有效。但现在我想自定义默认错误处理程序以使用指数退避。从参考文档中,我尝试将以下内容添加到 ApplicationConsumer class:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
return props;
}
但是现在我收到错误消息说找不到某些配置。看起来我不得不重新定义之前已经自动定义的 consumerConfigs() 中的所有属性。这包括从 bootstrap 服务器 uris 到 json-反序列化配置的所有内容。
有什么好的方法可以更新我的第一个版本代码以覆盖默认错误处理程序吗?
只需将错误处理程序定义为 @Bean
,Boot 会自动将其连接到其自动配置的容器工厂中。
编辑
这符合我的预期:
@SpringBootApplication
public class So70884203Application {
public static void main(String[] args) {
SpringApplication.run(So70884203Application.class, args);
}
@Bean
DefaultErrorHandler eh() {
return new DefaultErrorHandler((rec, ex) -> {
System.out.println("Recovered: " + rec);
}, new FixedBackOff(0L, 0L));
}
@KafkaListener(id = "so70884203", topics = "so70884203")
void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so70884203").partitions(1).replicas(1).build();
}
}
foo
Recovered: ConsumerRecord(topic = so70884203, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643316625291, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
我根据 Spring Kafka 文档创建了一个批处理消费者:
@SpringBootApplication
public class ApplicationConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
private static final String TOPIC = "foo";
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(topics = TOPIC)
public void listen(List<Name> ps) {
LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
}
}
我能够通过定义以下附加配置环境变量成功获得消费者 运行,Spring 会自动获取:
export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...
所以上面的代码有效。但现在我想自定义默认错误处理程序以使用指数退避。从参考文档中,我尝试将以下内容添加到 ApplicationConsumer class:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
return props;
}
但是现在我收到错误消息说找不到某些配置。看起来我不得不重新定义之前已经自动定义的 consumerConfigs() 中的所有属性。这包括从 bootstrap 服务器 uris 到 json-反序列化配置的所有内容。
有什么好的方法可以更新我的第一个版本代码以覆盖默认错误处理程序吗?
只需将错误处理程序定义为 @Bean
,Boot 会自动将其连接到其自动配置的容器工厂中。
编辑
这符合我的预期:
@SpringBootApplication
public class So70884203Application {
public static void main(String[] args) {
SpringApplication.run(So70884203Application.class, args);
}
@Bean
DefaultErrorHandler eh() {
return new DefaultErrorHandler((rec, ex) -> {
System.out.println("Recovered: " + rec);
}, new FixedBackOff(0L, 0L));
}
@KafkaListener(id = "so70884203", topics = "so70884203")
void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so70884203").partitions(1).replicas(1).build();
}
}
foo
Recovered: ConsumerRecord(topic = so70884203, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643316625291, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)