多个主题的 Kafka 消费者
Kafka consumer for multiple topic
我有一个主题列表(目前是 10 个),其大小将来会增加。我知道我们可以产生多个线程(每个主题)来从每个主题中消费,但在我的情况下,如果主题数量增加,那么从主题中消费的线程数量也会增加,这是我不希望的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态。
有什么办法可以让一个消费者消费所有topic吗?如果是,那么我们如何实现呢?此外,卡夫卡将如何维护偏移量?请提出答案。
我们可以使用以下方式订阅多个主题 API :
consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
消费者拥有主题信息,我们可以通过如下创建 OffsetAndMetadata 对象来使用 consumer.commitAsync 或 consumer.commitSync() 进行提交。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
不需要多线程,可以一个消费者,从多个主题消费。
偏移量由 zookeeper 维护,因为 kafka-server 本身是无状态的。
每当消费者消费一条消息时,它的偏移量就会提交给 zookeeper 以保持未来的跟踪以仅处理每条消息一次。因此,即使在 kafka 失败的情况下,消费者也会从上次提交的偏移量的下一个开始消费。
我有一个主题列表(目前是 10 个),其大小将来会增加。我知道我们可以产生多个线程(每个主题)来从每个主题中消费,但在我的情况下,如果主题数量增加,那么从主题中消费的线程数量也会增加,这是我不希望的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态。
有什么办法可以让一个消费者消费所有topic吗?如果是,那么我们如何实现呢?此外,卡夫卡将如何维护偏移量?请提出答案。
我们可以使用以下方式订阅多个主题 API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
消费者拥有主题信息,我们可以通过如下创建 OffsetAndMetadata 对象来使用 consumer.commitAsync 或 consumer.commitSync() 进行提交。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
不需要多线程,可以一个消费者,从多个主题消费。 偏移量由 zookeeper 维护,因为 kafka-server 本身是无状态的。 每当消费者消费一条消息时,它的偏移量就会提交给 zookeeper 以保持未来的跟踪以仅处理每条消息一次。因此,即使在 kafka 失败的情况下,消费者也会从上次提交的偏移量的下一个开始消费。