如何从kafka服务器获取主题中的所有消息

how to get the all messages in a topic from kafka server

我想从服务器获取主题中从头开始的所有消息。

例如:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning

当使用上面的控制台命令时,我希望能够从头开始获取主题中的所有消息,但我无法使用 java 代码从头开始使用主题中的所有消息。

最简单的方法是启动一个消费者并耗尽所有消息。现在我不知道你的主题有多少个分区,也不知道你是否已经有一个现有的消费者组,但你有几个选择:

看看这个API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

1) 如果你已经在同一个消费者组中有一个消费者,并且仍然想从头开始消费,你应该使用 API 文档中列出的 seek 选项并设置组中每个消费者的偏移量为 0。这将从头开始消耗。

2)否则,你可以在一个新的消费者组中启动几个消费者,你就不用担心寻找了。

PS:如果您以后对 Kafka 有更多疑问,请记得提供有关您的设置的更多详细信息。很多事情取决于您如何配置您的基础设施以及您希望它成为什么样子,因此会因情况而异。

TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition); 
consumer.assign(partitions);
consumer.seekToBeginning(partitions);

只需更改消费组

ConsumerConfig.GROUP_ID_CONFIG - to new group id

并设置

AUTO_OFFSET_RESET_CONFIG - earliest

示例代码-

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

您可以使用以下命令获取所有消息:

cd Users/kv/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic topicName --from-beginning --max-messages 100