为什么在分配不存在的主题时 KafkaConsumer 不抛出错误?
Why doesn't a KafkaConsumer throw an error when a non-existing topic is assigned?
我写了一个KafkaConsumer
。配置如下所示:
@Bean
Map<String, Object> consumerConfig(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
当我将不存在的主题分配给 KafkaConsumer
时,不会抛出任何错误。这是代码:
var topicPartition = new TopicPartition("75757584959595943", key);
var partitions = Set.of(topicPartition);
consumer.assign(partitions);
for (var records = consumer.poll(Duration.ZERO); !records.isEmpty(); ) {
// ...
为什么 KafkaConsumer
不提醒我有关不存在的主题?这没有帮助吗?
它确实提醒;在日志中,您会看到信息消息,包括 UNKNOWN_TOPIC_OR_PARTITION
这不是致命的异常。消费者将继续获取集群元数据并等待主题存在,然后在存在时进行轮询。
如果您希望 spring 创建主题,请为其创建一个 NewTopic bean。
如果您想检查主题是否存在,并抛出您自己的异常,请使用 AdminClient 和 describeTopics 方法
我写了一个KafkaConsumer
。配置如下所示:
@Bean
Map<String, Object> consumerConfig(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
当我将不存在的主题分配给 KafkaConsumer
时,不会抛出任何错误。这是代码:
var topicPartition = new TopicPartition("75757584959595943", key);
var partitions = Set.of(topicPartition);
consumer.assign(partitions);
for (var records = consumer.poll(Duration.ZERO); !records.isEmpty(); ) {
// ...
为什么 KafkaConsumer
不提醒我有关不存在的主题?这没有帮助吗?
它确实提醒;在日志中,您会看到信息消息,包括 UNKNOWN_TOPIC_OR_PARTITION
这不是致命的异常。消费者将继续获取集群元数据并等待主题存在,然后在存在时进行轮询。
如果您希望 spring 创建主题,请为其创建一个 NewTopic bean。
如果您想检查主题是否存在,并抛出您自己的异常,请使用 AdminClient 和 describeTopics 方法