Kafka10 消费者与 Kafka8 消费者
Kafka10 consumer vs Kafka8 consumer
我一直在使用 Kafka8 并尝试迁移到 kafka10。
我们有一个包含 10 个分区的主题,用于创建一个包含 10 个消费者的消费者组,如下所示。
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
这里,我们根据分区数来传递线程数。
但是,使用 kafka10 的消费者不确定是否有类似的东西。这里它不会 return 基于分区的流。
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.33.10:9092");
props.put("group.id", "group-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
提前致谢
新的消费者实现了一个简单高效的实现,可以处理来自单个线程的所有 IO。这与老消费者截然不同。有关详细信息,请参阅此博客:
我一直在使用 Kafka8 并尝试迁移到 kafka10。
我们有一个包含 10 个分区的主题,用于创建一个包含 10 个消费者的消费者组,如下所示。
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
这里,我们根据分区数来传递线程数。
但是,使用 kafka10 的消费者不确定是否有类似的东西。这里它不会 return 基于分区的流。
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.33.10:9092");
props.put("group.id", "group-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
提前致谢
新的消费者实现了一个简单高效的实现,可以处理来自单个线程的所有 IO。这与老消费者截然不同。有关详细信息,请参阅此博客: