为多个分区使用kafka批处理
Consuming kafka batch for multiple partitions
我了解到Kafka可以批量拉取事件。
我试图理解这种情况:
- 我有 4 个分区用于一个主题
- 我有 1 个消费者,Kafka 将所有 4 个分区分配给它。
- 让我们假设 Kafka 客户端从 Kafka 拉取的每个批次是 5 条消息。
我在这里想了解的是,如果 1 个批次中的事件都来自同一个分区,然后循环到下一个分区批次。还是批处理本身已经包含来自不同分区的事件?
我不能给你一个准确的答案,但我觉得它很有趣,可以测试一下。
为此,我创建了一个包含四个分区的主题,并使用 kafka-producer-perf-test
命令行工具在主题中生成了一些消息。由于性能测试工具根本不会创建任何键,因此消息将写入 round-robin 中的主题分区。
kafka-producer-perf-test --topic test --num-records 1337 --throughput -1 --record-size 128 --producer-props key.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props value.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props bootstrap.servers=localhost:9092
之后,我使用配置 max_poll_records=5
创建了一个简单的 KafkaConsumer 来匹配您的问题。消费者简单地打印出消费的每条消息的偏移量和分区:
Integer counter = 0;
// consume messages with `poll` call and print out results
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(settings)) {
consumer.subscribe(Arrays.asList(topic));
while (true) {
System.out.printf("Batch = %d\n", counter);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, partition = %d\n", record.offset(), record.partition());
}
counter += 1;
}
}
回答您的问题的结果是,消费者在移动到另一个分区之前尝试从一个分区获取尽可能多的数据。只有在来自分区 1
的所有消息都被消耗但未达到 max_poll_records 5 的限制的情况下,它添加了来自分区 2
.
的另外两条消息
为了更好地理解,这里有一些印刷品。
Batch = 0
offset = 310, partition = 0
offset = 311, partition = 0
offset = 312, partition = 0
offset = 313, partition = 0
offset = 314, partition = 0
Batch = 1
offset = 315, partition = 0
offset = 316, partition = 0
offset = 317, partition = 0
offset = 318, partition = 0
offset = 319, partition = 0
# only offsets with partition 0
Batch = 45
offset = 525, partition = 0
offset = 526, partition = 0
offset = 527, partition = 0
offset = 528, partition = 0
offset = 529, partition = 0
Batch = 46
offset = 728, partition = 1
offset = 729, partition = 1
offset = 730, partition = 1
offset = 731, partition = 1
offset = 732, partition = 1
# only offsets with partition 1
Batch = 86
offset = 928, partition = 1
offset = 929, partition = 1
offset = 930, partition = 1
offset = 931, partition = 1
offset = 932, partition = 1
Batch = 87
offset = 465, partition = 2
offset = 466, partition = 2
offset = 933, partition = 1
offset = 934, partition = 1
offset = 935, partition = 1
Batch = 88
offset = 467, partition = 2
offset = 468, partition = 2
offset = 469, partition = 2
offset = 470, partition = 2
offset = 471, partition = 2
## and so on
我了解到Kafka可以批量拉取事件。 我试图理解这种情况:
- 我有 4 个分区用于一个主题
- 我有 1 个消费者,Kafka 将所有 4 个分区分配给它。
- 让我们假设 Kafka 客户端从 Kafka 拉取的每个批次是 5 条消息。
我在这里想了解的是,如果 1 个批次中的事件都来自同一个分区,然后循环到下一个分区批次。还是批处理本身已经包含来自不同分区的事件?
我不能给你一个准确的答案,但我觉得它很有趣,可以测试一下。
为此,我创建了一个包含四个分区的主题,并使用 kafka-producer-perf-test
命令行工具在主题中生成了一些消息。由于性能测试工具根本不会创建任何键,因此消息将写入 round-robin 中的主题分区。
kafka-producer-perf-test --topic test --num-records 1337 --throughput -1 --record-size 128 --producer-props key.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props value.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props bootstrap.servers=localhost:9092
之后,我使用配置 max_poll_records=5
创建了一个简单的 KafkaConsumer 来匹配您的问题。消费者简单地打印出消费的每条消息的偏移量和分区:
Integer counter = 0;
// consume messages with `poll` call and print out results
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(settings)) {
consumer.subscribe(Arrays.asList(topic));
while (true) {
System.out.printf("Batch = %d\n", counter);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, partition = %d\n", record.offset(), record.partition());
}
counter += 1;
}
}
回答您的问题的结果是,消费者在移动到另一个分区之前尝试从一个分区获取尽可能多的数据。只有在来自分区 1
的所有消息都被消耗但未达到 max_poll_records 5 的限制的情况下,它添加了来自分区 2
.
为了更好地理解,这里有一些印刷品。
Batch = 0
offset = 310, partition = 0
offset = 311, partition = 0
offset = 312, partition = 0
offset = 313, partition = 0
offset = 314, partition = 0
Batch = 1
offset = 315, partition = 0
offset = 316, partition = 0
offset = 317, partition = 0
offset = 318, partition = 0
offset = 319, partition = 0
# only offsets with partition 0
Batch = 45
offset = 525, partition = 0
offset = 526, partition = 0
offset = 527, partition = 0
offset = 528, partition = 0
offset = 529, partition = 0
Batch = 46
offset = 728, partition = 1
offset = 729, partition = 1
offset = 730, partition = 1
offset = 731, partition = 1
offset = 732, partition = 1
# only offsets with partition 1
Batch = 86
offset = 928, partition = 1
offset = 929, partition = 1
offset = 930, partition = 1
offset = 931, partition = 1
offset = 932, partition = 1
Batch = 87
offset = 465, partition = 2
offset = 466, partition = 2
offset = 933, partition = 1
offset = 934, partition = 1
offset = 935, partition = 1
Batch = 88
offset = 467, partition = 2
offset = 468, partition = 2
offset = 469, partition = 2
offset = 470, partition = 2
offset = 471, partition = 2
## and so on