Kafka 消费者 returns 空值
Kafka Consumer returns null value
我正在尝试在 Java 中创建一个 kafka 消费者,但是 consumer.poll(5000)
方法无论如何都会调用 return 空值。这是代码:
package com.apache.kafka.consumer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class Consumer {
public static void main(String[] args) throws Exception {
final Logger logger = Logger.getLogger(Consumer.class);
//Kafka consumer configuration settings
String topicName = "mytopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe("sampletopic");
while (true) {
Map<String,ConsumerRecords<String, String>> records = consumer.poll(0);
for (ConsumerRecords<String, String> record : records.values()) {
System.out.println(records);
}
}
}
}
请帮忙!!!
我已经创建了主题并在其中添加了一些数据,而且 zookeeper 和 kafka 非常 运行。我不知道为什么 poll()
方法是 returning null。
对 poll
的调用需要在循环中,这就是文献中称之为 轮询循环的原因。
如果它返回 null
它要么过早轮询并退出 main
要么主题中没有数据
在此处查看用法示例https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
注意循环^
我正在尝试在 Java 中创建一个 kafka 消费者,但是 consumer.poll(5000)
方法无论如何都会调用 return 空值。这是代码:
package com.apache.kafka.consumer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class Consumer {
public static void main(String[] args) throws Exception {
final Logger logger = Logger.getLogger(Consumer.class);
//Kafka consumer configuration settings
String topicName = "mytopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe("sampletopic");
while (true) {
Map<String,ConsumerRecords<String, String>> records = consumer.poll(0);
for (ConsumerRecords<String, String> record : records.values()) {
System.out.println(records);
}
}
}
}
请帮忙!!!
我已经创建了主题并在其中添加了一些数据,而且 zookeeper 和 kafka 非常 运行。我不知道为什么 poll()
方法是 returning null。
对 poll
的调用需要在循环中,这就是文献中称之为 轮询循环的原因。
如果它返回 null
它要么过早轮询并退出 main
要么主题中没有数据
在此处查看用法示例https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
注意循环^