consumer.How指定分区读取? [卡夫卡]
consumer.How to specify partition to read? [kafka]
我正在学习Kafka,我想知道当我消费来自主题的消息时如何指定then分区。
我找到了几张这样的图片:
表示一个消费者可以消费多个分区的消息,但一个分区只能被一个消费者(在一个消费者组内)读取。
此外,我已经阅读了几个消费者示例,它们看起来像这样:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
和:
订阅:
consumer.subscribe(Arrays.asList(“foo”, “bar”));
投票
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
这是如何工作的?我将从哪个分区读取消息?
有两种方法可以告诉 topic/partitions 你想消费什么:KafkaConsumer#assign()(你指定你想要的分区和你开始的偏移量)和 subscribe
(你加入一个消费者组,并且 partition/offset 将由组协调器根据同一消费者组中的消费者动态分配,并且可能在运行时发生变化)
在这两种情况下,您都需要poll
接收数据。
参见 https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,尤其是段落 Consumer Groups and Topic Subscriptions
和 Manual Partition Assignment
我正在学习Kafka,我想知道当我消费来自主题的消息时如何指定then分区。
我找到了几张这样的图片:
表示一个消费者可以消费多个分区的消息,但一个分区只能被一个消费者(在一个消费者组内)读取。
此外,我已经阅读了几个消费者示例,它们看起来像这样:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
和:
订阅:
consumer.subscribe(Arrays.asList(“foo”, “bar”));
投票
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
这是如何工作的?我将从哪个分区读取消息?
有两种方法可以告诉 topic/partitions 你想消费什么:KafkaConsumer#assign()(你指定你想要的分区和你开始的偏移量)和 subscribe
(你加入一个消费者组,并且 partition/offset 将由组协调器根据同一消费者组中的消费者动态分配,并且可能在运行时发生变化)
在这两种情况下,您都需要poll
接收数据。
参见 https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,尤其是段落 Consumer Groups and Topic Subscriptions
和 Manual Partition Assignment