KafkaAvroDeserializer 问题

KafkaAvroDeserializer Issue

我正在尝试使用 Avro Kafka Deserializer 反序列化 Kafka Avro 消息。这段代码很常见,很多用户已经在实践中使用。但是我在实施时遇到了一些困难: 代码:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
        @SuppressWarnings("unchecked")
        ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000);  //1

       for (ConsumerRecord<String, ATPEvent> record: records)  //2 {

            try {
                kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
                avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
                avroEvent = reader.read(avroEvent, avroBinaryDecoder);
                System.out.println(avroEvent);
                kafkaMessageInputStream.close();
              } catch (Exception ex) {
                System.out.println("Unable to process event from kafka, see exception details" + ex);
              }
        }
        consumer.commitSync(); //4
}

现在,这是第 4 期:

  1. 我必须加cast,否则会报错 Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent>

  2. Can only iterate over an array or an instance of java.lang.Iterable 我不知道为什么会这样?我可以这样做吗? :

    List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);
    
    for (ConsumerRecord<String, ATPEvent> record: records) {
    
  3. The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent) 这个我明白了,但是我怎样才能把一个class转换成一个字节,还有其他方法吗?

  4. The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent> 我可以只使用 consumer.close();

请为 2 和 3 提供解决方案,并在可能的情况下为 1 和 4 提供解释。

您使用哪个版本的 Kafka? 0.8.x 和 0.9.x

有区别

卡夫卡 0.8.x:

  1. return 类型是 Map<String, ConsumerRecords>(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
  2. 使用 ConsumerRecords#records(...) 得到 List<ConsumerRecord> 进行迭代(参见 http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecords.html
  3. ConsumerRecord.value()returnbyte[](见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecord.html#value%28%29
  4. 使用commit(boolen)(参见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commit%28boolean%29)--commitSync()仅在0中可用。9.x

卡夫卡 0.9.x

  1. return 类型是 ConsumerRecords(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
  2. ConsumerRecords 实现了 Iterable(参见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)因此,您可以使用 for(ConsumerRecord r : records)
  3. ConsumerRecord#value returns T(你的情况是 T == ATPEvent)(见 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
  4. commitSync() 在 0 可用。9.x