无法使用基于 java 的 Kafka 消费者读取一条消息
Failed to read one message wth java-based Kafka consumer
我尝试实现一个 java Kafka 消费者。我使用 Kafka 服务器版本 0.9。
这是测试用的,所以我只需要看一条消息就可以了。
public static ConsumerRecords<String, String> readFromKafka() {
ConsumerRecords<String, String> records = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "<KAFKA_SERVER_HOST>:9092");
kafkaProps.put("auto.commit.enable", "false");
kafkaProps.put("value.deserializer", StringDeserializer.class.getName());
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("client.id", "testScore0");
kafkaProps.put("group.id", "testScore1");
kafkaProps.put("auto.offset.reset", "latest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Arrays.asList("my_topic"));
records = consumer.poll(0);
} catch (Exception e) {
logger.error("Can not read from kafka", e);
}
return records;
}
返回的记录对象为空:
我在连接到相同 KAFKA_SERVER_HOST 的本地计算机上执行命令行 Kafka 消费者并获取消息。
更改
的轮询时间
records = consumer.poll(0);
对于大于 0 的值,尝试使用 100。
records = consumer.poll(100);
我尝试实现一个 java Kafka 消费者。我使用 Kafka 服务器版本 0.9。 这是测试用的,所以我只需要看一条消息就可以了。
public static ConsumerRecords<String, String> readFromKafka() {
ConsumerRecords<String, String> records = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "<KAFKA_SERVER_HOST>:9092");
kafkaProps.put("auto.commit.enable", "false");
kafkaProps.put("value.deserializer", StringDeserializer.class.getName());
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("client.id", "testScore0");
kafkaProps.put("group.id", "testScore1");
kafkaProps.put("auto.offset.reset", "latest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Arrays.asList("my_topic"));
records = consumer.poll(0);
} catch (Exception e) {
logger.error("Can not read from kafka", e);
}
return records;
}
返回的记录对象为空:
我在连接到相同 KAFKA_SERVER_HOST 的本地计算机上执行命令行 Kafka 消费者并获取消息。
更改
的轮询时间records = consumer.poll(0);
对于大于 0 的值,尝试使用 100。
records = consumer.poll(100);