如何在 Kafka Consumer API 中使用 key 读取数据?
How to read data using key in Kafka Consumer API?
我正在使用以下代码构建消息...
Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420", "{message:'hello world'}");
producer.send(keyedMsg);
并使用以下代码块消费...
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
// Get Kafka stream for topic
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);
// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
while (consumerIte.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();
System.out.println(topic.toUpperCase() + ">"
+ " Partition:" + msg.partition()
+ " | Key:"+ new String(msg.key())
+ " | Offset:" + msg.offset()
+ " | Message:"+ new String(msg.message()));
}
}
一切正常,因为我正在明智地阅读数据主题。所以我想知道有没有办法使用消息 key 来消耗数据,即本例中的 device-420?
简答:没有。
Kafka 中最小的粒度是一个partition。您可以编写一个只从单个分区读取的客户端。但是,一个分区可以包含多个键,您需要消耗该分区中包含的所有键。
我正在使用以下代码构建消息...
Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420", "{message:'hello world'}");
producer.send(keyedMsg);
并使用以下代码块消费...
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
// Get Kafka stream for topic
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);
// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
while (consumerIte.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();
System.out.println(topic.toUpperCase() + ">"
+ " Partition:" + msg.partition()
+ " | Key:"+ new String(msg.key())
+ " | Offset:" + msg.offset()
+ " | Message:"+ new String(msg.message()));
}
}
一切正常,因为我正在明智地阅读数据主题。所以我想知道有没有办法使用消息 key 来消耗数据,即本例中的 device-420?
简答:没有。
Kafka 中最小的粒度是一个partition。您可以编写一个只从单个分区读取的客户端。但是,一个分区可以包含多个键,您需要消耗该分区中包含的所有键。