Serde class 用于 AVRO 原始类型
Serde class for AVRO primitive type
我正在 Java 中编写一个 Kafka 流应用程序,它接受由一个连接器创建的输入主题,该连接器将模式注册表和 avro 用于键和值转换器。连接器生成以下架构:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
实际上,有几个主题,键模式总是"int",值模式总是某种记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
起初我尝试用类似的东西来讨论这个话题
Consumed.with(Serdes.Integer(), userSerde);
但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用 Consumed.with(Serdes.Bytes(), userSerde);
有效,但我真的想要 int 而不是字节,所以我将代码更改为这个
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使编译器产生警告(它不喜欢 (Serde<Integer>)(Serde)
转换)但它允许我使用
Consumed.with(keySerde, userSerde);
并得到一个整数作为键。这工作得很好,我的应用程序按预期运行(太棒了!!!)。但是现在我想为 key/value 定义默认的 serde,但我无法让它工作。
设置默认值 serde 很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是我不知道如何定义默认密钥 serde。
我试过了
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());
产生运行时错误:无法为 org.apache.kafka.common.serialization.Serdes$WrapperSerde 找到 public 无参数构造函数
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
产生运行时错误:java.lang.Integer 无法转换为 org.apache.avro.specific.SpecificRecord
我错过了什么?
谢谢。
更新 (5.5 及更新版本)
Confluent 版本 5.5
通过 PrimitiveAvroSerde
添加了对原始 Avro 类型的原生支持(参见 https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)
原始答案 (5.4 及更早版本):
这是一个已知问题。原始 Avro 类型不适用于 Confluent 的 AvroSerdes,因为 Serdes 仅适用于 GenericAvroRecord
和 SpecificAvroRecord
。
因此,基于 KafkaAvroSerializer
和 KafkaAvroDeserializer
构建您自己的 Serde 是正确的方法。为了能够将其作为默认 Serde 传递到配置中,您不能使用 Serdes.serdeFrom
,因为类型信息由于泛型类型擦除而丢失。
但是,您可以实现自己的 class 来扩展 Serde
接口,并将您的自定义 class 传递到配置中:
public class MySerde extends Serde<Integer> {
// use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
感谢@Matthias J. Sax 的提示,我想 post 围绕 solution.please 的工作免费增强它。
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {
private final Serde<Object> inner;
/**
* Constructor used by Kafka Streams.
*/
public GenericPrimitiveAvroSerDe() {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));
}
@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}
@Override
public void close() {
// TODO Auto-generated method stub
inner.serializer().close();
inner.deserializer().close();
}
@SuppressWarnings("unchecked")
@Override
public Serializer<T> serializer() {
// TODO Auto-generated method stub
Object obj = inner.serializer();
return (Serializer<T>) obj;
}
@SuppressWarnings("unchecked")
@Override
public Deserializer<T> deserializer() {
// TODO Auto-generated method stub
Object obj = inner.deserializer();
return (Deserializer<T>) obj;
}
}
用作默认流配置:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
覆盖默认值:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
此外,如果您想将@Thiyaga Rajan 的出色解决方案与 KAFKA 消费者和生产者一起使用
consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, GenericPrimitiveAvroSerDe.class);
将此添加到 class
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
public class GenericPrimitiveAvroSerDe<T> implements Serde<T>, Serializer<T>, Deserializer<T> {
@Override
public T deserialize(String topic, byte[] data) {
return this.deserializer().deserialize(topic, data);
}
@Override
public byte[] serialize(String topic, T data) {
return this.serializer().serialize(topic, data);
}
...
}
我正在 Java 中编写一个 Kafka 流应用程序,它接受由一个连接器创建的输入主题,该连接器将模式注册表和 avro 用于键和值转换器。连接器生成以下架构:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
实际上,有几个主题,键模式总是"int",值模式总是某种记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
起初我尝试用类似的东西来讨论这个话题
Consumed.with(Serdes.Integer(), userSerde);
但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用 Consumed.with(Serdes.Bytes(), userSerde);
有效,但我真的想要 int 而不是字节,所以我将代码更改为这个
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使编译器产生警告(它不喜欢 (Serde<Integer>)(Serde)
转换)但它允许我使用
Consumed.with(keySerde, userSerde);
并得到一个整数作为键。这工作得很好,我的应用程序按预期运行(太棒了!!!)。但是现在我想为 key/value 定义默认的 serde,但我无法让它工作。
设置默认值 serde 很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是我不知道如何定义默认密钥 serde。
我试过了
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());
产生运行时错误:无法为 org.apache.kafka.common.serialization.Serdes$WrapperSerde 找到 public 无参数构造函数
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
产生运行时错误:java.lang.Integer 无法转换为 org.apache.avro.specific.SpecificRecord
我错过了什么? 谢谢。
更新 (5.5 及更新版本)
Confluent 版本 5.5
通过 PrimitiveAvroSerde
添加了对原始 Avro 类型的原生支持(参见 https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)
原始答案 (5.4 及更早版本):
这是一个已知问题。原始 Avro 类型不适用于 Confluent 的 AvroSerdes,因为 Serdes 仅适用于 GenericAvroRecord
和 SpecificAvroRecord
。
因此,基于 KafkaAvroSerializer
和 KafkaAvroDeserializer
构建您自己的 Serde 是正确的方法。为了能够将其作为默认 Serde 传递到配置中,您不能使用 Serdes.serdeFrom
,因为类型信息由于泛型类型擦除而丢失。
但是,您可以实现自己的 class 来扩展 Serde
接口,并将您的自定义 class 传递到配置中:
public class MySerde extends Serde<Integer> {
// use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
感谢@Matthias J. Sax 的提示,我想 post 围绕 solution.please 的工作免费增强它。
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {
private final Serde<Object> inner;
/**
* Constructor used by Kafka Streams.
*/
public GenericPrimitiveAvroSerDe() {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));
}
@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}
@Override
public void close() {
// TODO Auto-generated method stub
inner.serializer().close();
inner.deserializer().close();
}
@SuppressWarnings("unchecked")
@Override
public Serializer<T> serializer() {
// TODO Auto-generated method stub
Object obj = inner.serializer();
return (Serializer<T>) obj;
}
@SuppressWarnings("unchecked")
@Override
public Deserializer<T> deserializer() {
// TODO Auto-generated method stub
Object obj = inner.deserializer();
return (Deserializer<T>) obj;
}
}
用作默认流配置:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
覆盖默认值:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
此外,如果您想将@Thiyaga Rajan 的出色解决方案与 KAFKA 消费者和生产者一起使用
consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, GenericPrimitiveAvroSerDe.class);
将此添加到 class
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
public class GenericPrimitiveAvroSerDe<T> implements Serde<T>, Serializer<T>, Deserializer<T> {
@Override
public T deserialize(String topic, byte[] data) {
return this.deserializer().deserialize(topic, data);
}
@Override
public byte[] serialize(String topic, T data) {
return this.serializer().serialize(topic, data);
}
...
}