KafkaAvroDeserializer 问题
KafkaAvroDeserializer Issue
我正在尝试使用 Avro Kafka Deserializer 反序列化 Kafka Avro 消息。这段代码很常见,很多用户已经在实践中使用。但是我在实施时遇到了一些困难:
代码:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
@SuppressWarnings("unchecked")
ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000); //1
for (ConsumerRecord<String, ATPEvent> record: records) //2 {
try {
kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
avroEvent = reader.read(avroEvent, avroBinaryDecoder);
System.out.println(avroEvent);
kafkaMessageInputStream.close();
} catch (Exception ex) {
System.out.println("Unable to process event from kafka, see exception details" + ex);
}
}
consumer.commitSync(); //4
}
现在,这是第 4 期:
我必须加cast,否则会报错
Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent>
Can only iterate over an array or an instance of java.lang.Iterable
我不知道为什么会这样?我可以这样做吗? :
List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);
for (ConsumerRecord<String, ATPEvent> record: records) {
The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent)
这个我明白了,但是我怎样才能把一个class转换成一个字节,还有其他方法吗?
The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent>
我可以只使用 consumer.close();
请为 2 和 3 提供解决方案,并在可能的情况下为 1 和 4 提供解释。
您使用哪个版本的 Kafka? 0.8.x 和 0.9.x
有区别
卡夫卡 0.8.x:
- return 类型是
Map<String, ConsumerRecords>
(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
- 使用
ConsumerRecords#records(...)
得到 List<ConsumerRecord>
进行迭代(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecords.html)
ConsumerRecord.value()
returnbyte[]
(见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecord.html#value%28%29)
- 使用
commit(boolen)
(参见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commit%28boolean%29)--commitSync()
仅在0中可用。9.x
卡夫卡 0.9.x
- return 类型是
ConsumerRecords
(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
ConsumerRecords
实现了 Iterable
(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)因此,您可以使用 for(ConsumerRecord r : records)
ConsumerRecord#value
returns T
(你的情况是 T == ATPEvent
)(见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
commitSync()
在 0 可用。9.x
我正在尝试使用 Avro Kafka Deserializer 反序列化 Kafka Avro 消息。这段代码很常见,很多用户已经在实践中使用。但是我在实施时遇到了一些困难: 代码:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
@SuppressWarnings("unchecked")
ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000); //1
for (ConsumerRecord<String, ATPEvent> record: records) //2 {
try {
kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
avroEvent = reader.read(avroEvent, avroBinaryDecoder);
System.out.println(avroEvent);
kafkaMessageInputStream.close();
} catch (Exception ex) {
System.out.println("Unable to process event from kafka, see exception details" + ex);
}
}
consumer.commitSync(); //4
}
现在,这是第 4 期:
我必须加cast,否则会报错
Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent>
Can only iterate over an array or an instance of java.lang.Iterable
我不知道为什么会这样?我可以这样做吗? :List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000); for (ConsumerRecord<String, ATPEvent> record: records) {
The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent)
这个我明白了,但是我怎样才能把一个class转换成一个字节,还有其他方法吗?The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent>
我可以只使用consumer.close();
请为 2 和 3 提供解决方案,并在可能的情况下为 1 和 4 提供解释。
您使用哪个版本的 Kafka? 0.8.x 和 0.9.x
有区别卡夫卡 0.8.x:
- return 类型是
Map<String, ConsumerRecords>
(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html) - 使用
ConsumerRecords#records(...)
得到List<ConsumerRecord>
进行迭代(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecords.html) ConsumerRecord.value()
returnbyte[]
(见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecord.html#value%28%29)- 使用
commit(boolen)
(参见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commit%28boolean%29)--commitSync()
仅在0中可用。9.x
卡夫卡 0.9.x
- return 类型是
ConsumerRecords
(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) ConsumerRecords
实现了Iterable
(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)因此,您可以使用for(ConsumerRecord r : records)
ConsumerRecord#value
returnsT
(你的情况是T == ATPEvent
)(见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)commitSync()
在 0 可用。9.x