为什么在分配不存在的主题时 KafkaConsumer 不抛出错误?

Why doesn't a KafkaConsumer throw an error when a non-existing topic is assigned?

我写了一个KafkaConsumer。配置如下所示:

  @Bean
  Map<String, Object> consumerConfig(
      @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
    return Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
        "earliest",
        ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
        false,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
  }

当我将不存在的主题分配给 KafkaConsumer 时,不会抛出任何错误。这是代码:

var topicPartition = new TopicPartition("75757584959595943", key);
var partitions = Set.of(topicPartition);
consumer.assign(partitions);
for (var records = consumer.poll(Duration.ZERO); !records.isEmpty(); ) {
  // ...

为什么 KafkaConsumer 不提醒我有关不存在的主题?这没有帮助吗?

它确实提醒;在日志中,您会看到信息消息,包括 UNKNOWN_TOPIC_OR_PARTITION

这不是致命的异常。消费者将继续获取集群元数据并等待主题存在,然后在存在时进行轮询。

如果您希望 spring 创建主题,请为其创建一个 NewTopic bean。

如果您想检查主题是否存在,并抛出您自己的异常,请使用 AdminClient 和 describeTopics 方法