Kafka:轮询期间的反序列化问题

Kafka : Deserialization issues during poll

为了更好地了解平台,我一直在玩 kafka 的 confluent 版本。对于发送到一个主题的一些格式错误的 avro 消息,我收到了一些序列化异常。让我用事实来解释问题:

<kafka.new.version>0.10.2.0-cp1</kafka.new.version>
<confluent.version>3.2.0</confluent.version>
<avro.version>1.7.7</avro.version>

意图:非常简单,生产者正在发送 Avro 记录,消费者应该毫无问题地消费所有记录,(它可以留下所有与模式注册表中的模式不兼容的消息。) 用法:

Producer -> 
Key -> StringSerializer
Value -> KafkaAvroSerializer

Consumer ->
Key -> StringDeserializer
Value -> KafkaAvroDeserializer

其他消费属性(仅供参考):

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myconsumer-4");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someclient-4");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    properties.put("schema.registry.url", "schemaregistryhost:8081");

我能够毫无问题地使用消息,直到其他一些生产者错误地向该主题发送了 一条 消息并且修改了模式注册表中的最新模式。 (我们在模式注册表中启用了一个选项,因此您可以将任何消息发送到主题,并且模式注册表每次都会制作一个新版本的模式,如果关闭我们也可以切换。)

现在,由于这个 一个 错误消息,poll() 因序列化问题而失败。它确实给了我失败的偏移量,我可以使用 seek() 传递偏移量,但这听起来不太好。我还尝试将最大轮询记录数设置为 10 并将 poll() 超时设置为非常小,以便我可以通过捕获异常来忽略最多 10 条记录,但由于某种原因,最大记录数无法正常工作并且代码会立即失败并出现序列化错误,即使我从开始和错误消息的偏移量为 240。

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

另一个简单的解决方案是在我的应用程序中使用 ByteArrayDeserializer 和 KafkaAvroDecoder,我可以处理反序列化问题。

我相信我缺少或做错了什么。也添加异常:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic.ongo.test3.user14-0 at offset 220
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 186
Caused by: org.apache.avro.AvroTypeException: Found com.catapult.TestUser, expecting com.catapult.TestUser, missing required field testname
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:131)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:869)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:775)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:473)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

发现同一问题上已经有一个开放的jira票: https://issues.apache.org/jira/browse/KAFKA-4740