是否可以使用密钥和分区来使用 kafka 消息?
Is it possible to consume kafka messages using key and partition?
我正在使用 kafka_2.12 版本 2.3.0 我正在使用分区和将数据发布到 kafka 主题中钥匙。我需要找到一种方法,使用该方法可以使用键和分区组合使用主题中的特定消息。这样我就不必使用所有消息并迭代正确的消息。
目前我只能做到这一点
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
it -> it.key().equals(key)
}
你不能"get messages by key from Kafka"。
如果可行的话,一个解决方案是拥有与密钥一样多的分区,并且始终将密钥的消息路由到同一分区。
作为分区的消息键
kafkaConsumer.assign(topicPartitions);
kafkaConsumer.seekToBeginning(topicPartitions);
// Pull records from kafka, keep polling until we get nothing back
final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
ConsumerRecords<byte[], byte[]> records;
do {
// Grab records from kafka
records = kafkaConsumer.poll(2000L);
logger.info("Found {} records in kafka", records.count());
// Add to our array list
records.forEach(allRecords::add);
}
while (!records.isEmpty());
仅使用主题名称访问主题的消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
有两种消费方式topic/partitions是:
- KafkaConsumer.assign() : Document link
- KafkaConsumer.subscribe() : Document link
所以,您无法通过按键获取消息。
如果您没有扩展分区的计划,请考虑使用 assign() 方法。因为所有带有特定密钥的消息都会转到同一个分区。
使用方法:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
String data = records.findAll {
it -> it.key().equals(key)
}
}
我正在使用 kafka_2.12 版本 2.3.0 我正在使用分区和将数据发布到 kafka 主题中钥匙。我需要找到一种方法,使用该方法可以使用键和分区组合使用主题中的特定消息。这样我就不必使用所有消息并迭代正确的消息。
目前我只能做到这一点
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
it -> it.key().equals(key)
}
你不能"get messages by key from Kafka"。
如果可行的话,一个解决方案是拥有与密钥一样多的分区,并且始终将密钥的消息路由到同一分区。
作为分区的消息键
kafkaConsumer.assign(topicPartitions);
kafkaConsumer.seekToBeginning(topicPartitions);
// Pull records from kafka, keep polling until we get nothing back
final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
ConsumerRecords<byte[], byte[]> records;
do {
// Grab records from kafka
records = kafkaConsumer.poll(2000L);
logger.info("Found {} records in kafka", records.count());
// Add to our array list
records.forEach(allRecords::add);
}
while (!records.isEmpty());
仅使用主题名称访问主题的消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
有两种消费方式topic/partitions是:
- KafkaConsumer.assign() : Document link
- KafkaConsumer.subscribe() : Document link
所以,您无法通过按键获取消息。
如果您没有扩展分区的计划,请考虑使用 assign() 方法。因为所有带有特定密钥的消息都会转到同一个分区。
使用方法:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
String data = records.findAll {
it -> it.key().equals(key)
}
}