Kafka 消费者 returns 空值

Kafka Consumer returns null value

我正在尝试在 Java 中创建一个 kafka 消费者,但是 consumer.poll(5000) 方法无论如何都会调用 return 空值。这是代码:

package com.apache.kafka.consumer;


import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

import org.apache.kafka.clients.consumer.ConsumerRecords;

public class Consumer {
    public static void main(String[] args) throws Exception {
        final Logger logger = Logger.getLogger(Consumer.class);
          //Kafka consumer configuration settings
          String topicName = "mytopic";
          Properties props = new Properties();
      
      
           props.put("bootstrap.servers", "localhost:9092");
           props.put("group.id", "test");
           props.put("enable.auto.commit", "true");
           props.put("auto.offset.reset","earliest");
           props.put("auto.commit.interval.ms", "1000");
           props.put("key.deserializer", 
           "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("value.deserializer", 
           "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("partition.assignment.strategy", "range");
           KafkaConsumer<String, String> consumer = new 
           KafkaConsumer<String, String>(props);
   
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe("sampletopic");
      while (true) {
          Map<String,ConsumerRecords<String, String>> records = consumer.poll(0);
          for (ConsumerRecords<String, String> record : records.values()) {
              System.out.println(records);
          }
     }
}

}

请帮忙!!!

我已经创建了主题并在其中添加了一些数据,而且 zookeeper 和 kafka 非常 运行。我不知道为什么 poll() 方法是 returning null。

poll 的调用需要在循环中,这就是文献中称之为 轮询循环的原因

如果它返回 null 它要么过早轮询并退出 main 要么主题中没有数据

在此处查看用法示例https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

注意循环^