如何为 GenericData.Record 编写 KafkaAvro Serde
How to write a KafkaAvro Serde for GenericData.Record
我使用 Kafka 0.10.2 和 Avro 来序列化我的消息,包括键和值数据。
现在我想使用 Kafka Streams,但我一直在尝试为 GenericData.Record
class.
编写 Serde
class
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
这是我在注释行遇到的错误
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
我需要为 GenericData.Record
(而不是我的特定 POJO)定义 Serde class,因为我可以在同一个通道上有不同的对象结构,所以反序列化器应该 return 我 GenericData
(我将在这一步之后填充正确的 POJO)。
你会怎么做?
谢谢
好的,我想我做到了。我按照这个例子
我使用了生成 GenericRecord 对象的 GenericAvroSerde class,然后我可以使用它。
我希望这对其他人有用。
你已经问过了 question in the Confluent mailing list。这是我在那里发布的答案的摘要。
我们刚刚完成了针对 Kafka Streams 的官方 Confluent Avro serde(特定 Avro + 通用 Avro)的工作。参见 https://github.com/confluentinc/schema-registry/tree/master/avro-serde。
新的 Avro serde,即 Confluent 模式注册表 aware/compatible,将与即将发布的 Confluent 3.3 一起发布,几周后。
在 3.3 发布之前,您可以从 master
分支构建您自己的工件(您必须首先使用 mvn install
构建 confluentinc/common and confluentinc/rest-utils 的 master
分支,然后是带有 mvn install
的模式注册表项目),或者例如将 类 复制粘贴到您自己的代码项目中。
Note: The master
branch in the projects above and below are development branches, i.e. pre-release branches. Future readers of this answer should keep this in mind.
我们还提供了有关如何使用即将推出的新 Confluent Avro serde 的示例。您可以在 https://github.com/confluentinc/examples.
的 master
分支中找到演示
我使用 Kafka 0.10.2 和 Avro 来序列化我的消息,包括键和值数据。
现在我想使用 Kafka Streams,但我一直在尝试为 GenericData.Record
class.
Serde
class
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
这是我在注释行遇到的错误
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
我需要为 GenericData.Record
(而不是我的特定 POJO)定义 Serde class,因为我可以在同一个通道上有不同的对象结构,所以反序列化器应该 return 我 GenericData
(我将在这一步之后填充正确的 POJO)。
你会怎么做? 谢谢
好的,我想我做到了。我按照这个例子
我使用了生成 GenericRecord 对象的 GenericAvroSerde class,然后我可以使用它。
我希望这对其他人有用。
你已经问过了 question in the Confluent mailing list。这是我在那里发布的答案的摘要。
我们刚刚完成了针对 Kafka Streams 的官方 Confluent Avro serde(特定 Avro + 通用 Avro)的工作。参见 https://github.com/confluentinc/schema-registry/tree/master/avro-serde。
新的 Avro serde,即 Confluent 模式注册表 aware/compatible,将与即将发布的 Confluent 3.3 一起发布,几周后。
在 3.3 发布之前,您可以从 master
分支构建您自己的工件(您必须首先使用 mvn install
构建 confluentinc/common and confluentinc/rest-utils 的 master
分支,然后是带有 mvn install
的模式注册表项目),或者例如将 类 复制粘贴到您自己的代码项目中。
Note: The
master
branch in the projects above and below are development branches, i.e. pre-release branches. Future readers of this answer should keep this in mind.
我们还提供了有关如何使用即将推出的新 Confluent Avro serde 的示例。您可以在 https://github.com/confluentinc/examples.
的master
分支中找到演示