如何为 GenericData.Record 编写 KafkaAvro Serde

How to write a KafkaAvro Serde for GenericData.Record

我使用 Kafka 0.10.2 和 Avro 来序列化我的消息,包括键和值数据。 现在我想使用 Kafka Streams,但我一直在尝试为 GenericData.Record class.

编写 Serde class
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]

public final class KafkaAvroSerde implements Serde<Record> {

    private final Serde<Record> inner;

    public KafkaAvroSerde() {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
    }

    public KafkaAvroSerde(SchemaRegistryClient client) {
        this(client, Collections.emptyMap());
    }

    public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
    }

    @Override
    public Serializer<Record> serializer() {
        return inner.serializer();
    }

    @Override
    public Deserializer<Record> deserializer() {
        return inner.deserializer();
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        inner.serializer().configure(configs, isKey);
        inner.deserializer().configure(configs, isKey);
    }

    @Override
    public void close() {
        inner.serializer().close();
        inner.deserializer().close();
    }

}

这是我在注释行遇到的错误

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>

我需要为 GenericData.Record(而不是我的特定 POJO)定义 Serde class,因为我可以在同一个通道上有不同的对象结构,所以反序列化器应该 return 我 GenericData (我将在这一步之后填充正确的 POJO)。

你会怎么做? 谢谢

好的,我想我做到了。我按照这个例子

https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java

我使用了生成 GenericRecord 对象的 GenericAvroSerde class,然后我可以使用它。

我希望这对其他人有用。

你已经问过了 question in the Confluent mailing list。这是我在那里发布的答案的摘要。

我们刚刚完成了针对 Kafka Streams 的官方 Confluent Avro serde(特定 Avro + 通用 Avro)的工作。参见 https://github.com/confluentinc/schema-registry/tree/master/avro-serde

新的 Avro serde,即 Confluent 模式注册表 aware/compatible,将与即将发布的 Confluent 3.3 一起发布,几周后。

在 3.3 发布之前,您可以从 master 分支构建您自己的工件(您必须首先使用 mvn install 构建 confluentinc/common and confluentinc/rest-utilsmaster 分支,然后是带有 mvn install 的模式注册表项目),或者例如将 类 复制粘贴到您自己的代码项目中。

Note: The master branch in the projects above and below are development branches, i.e. pre-release branches. Future readers of this answer should keep this in mind.

我们还提供了有关如何使用即将推出的新 Confluent Avro serde 的示例。您可以在 https://github.com/confluentinc/examples.

master 分支中找到演示