SpringBoot Kafka 消费者:无法启动 bean internalKafkaListenerEndpointRegistry TimeoutException
SpringBoot Kafka Consumer: Failed to start bean internalKafkaListenerEndpointRegistry TimeoutException
我有一个使用 kafka 的 spring 引导应用程序,但我无法启动它,因为我刚刚实现了一个正在侦听离线服务器的 kafka 消费者。当我开始它时,我得到了一些:
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
因为kafka宕机了
如何配置 spring 启动应用程序,使其在 kafka 服务器关闭时启动?
下面是我的 Kafka 消费者配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${app.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${HOSTNAME:NO_HOSTNAME}")
private String groupId;
@Value(value = "${spring.profiles.active}")
private String activeSpringProfile;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,String.format("RANDOM_GROUP_ID_%s_%s", groupId, RandomUtils.nextInt()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
谢谢!
对于最新版本,missingTopicsFatal
容器 属性 是 true
,这就是导致此问题的原因。您可以将其关闭...
@Component
class ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.getContainerProperties().setMissingTopicsFatal(false);
}
}
我有一个使用 kafka 的 spring 引导应用程序,但我无法启动它,因为我刚刚实现了一个正在侦听离线服务器的 kafka 消费者。当我开始它时,我得到了一些:
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
因为kafka宕机了
如何配置 spring 启动应用程序,使其在 kafka 服务器关闭时启动?
下面是我的 Kafka 消费者配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${app.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${HOSTNAME:NO_HOSTNAME}")
private String groupId;
@Value(value = "${spring.profiles.active}")
private String activeSpringProfile;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,String.format("RANDOM_GROUP_ID_%s_%s", groupId, RandomUtils.nextInt()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
谢谢!
对于最新版本,missingTopicsFatal
容器 属性 是 true
,这就是导致此问题的原因。您可以将其关闭...
@Component
class ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.getContainerProperties().setMissingTopicsFatal(false);
}
}