使用 KafkaAvroDecoder 将 Avro 消息反序列化为特定数据
Deserialize Avro messages into specific datum using KafkaAvroDecoder
我正在阅读 Kafka 主题,其中包含使用 KafkaAvroEncoder
(自动将模式注册到主题)序列化的 Avro 消息。我正在使用 maven-avro-plugin 生成普通的 Java 类,我想在阅读时使用它。
KafkaAvroDecoder
仅支持反序列化为 GenericData.Record
类型,这(在我看来)忽略了拥有静态类型语言的全部意义。我的反序列化代码目前看起来像这样:
SpecificDatumReader<event> reader = new SpecificDatumReader<>(
event.getClassSchema() // event is my class generated from the schema
);
byte[] in = ...; // my input bytes;
ByteBuffer stuff = ByteBuffer.wrap(in);
// the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored
// in the schema-registry) before the serialized message
if (stuff.get() != 0x0) {
return;
}
int id = stuff.getInt();
// lets just ignore those special bytes
int length = stuff.limit() - 4 - 1;
int start = stuff.position() + stuff.arrayOffset();
Decoder decoder = DecoderFactory.get().binaryDecoder(
stuff.array(), start, length, null
);
try {
event ev = reader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
我发现我的解决方案很麻烦,所以我想知道是否有更简单的解决方案。
多亏了评论,我才能找到答案。秘诀是用 Properties
实例化 KafkaAvroDecoder
指定特定 Avro reader 的使用,即:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "...");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
VerifiableProp vProps = new VerifiableProperties(props);
KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
MyLittleData data = (MyLittleData) decoder.fromBytes(input);
相同的配置适用于直接使用 KafkaConsumer<K, V>
class 的情况(我在 Storm 中使用来自 storm-kafka 项目的 KafkaSpout
从 Kafka 消费,这使用 SimpleConsumer
,所以我必须手动反序列化消息。如果有勇气,可以使用 storm-kafka-client 项目,它使用新样式的消费者自动执行此操作)。
我正在阅读 Kafka 主题,其中包含使用 KafkaAvroEncoder
(自动将模式注册到主题)序列化的 Avro 消息。我正在使用 maven-avro-plugin 生成普通的 Java 类,我想在阅读时使用它。
KafkaAvroDecoder
仅支持反序列化为 GenericData.Record
类型,这(在我看来)忽略了拥有静态类型语言的全部意义。我的反序列化代码目前看起来像这样:
SpecificDatumReader<event> reader = new SpecificDatumReader<>(
event.getClassSchema() // event is my class generated from the schema
);
byte[] in = ...; // my input bytes;
ByteBuffer stuff = ByteBuffer.wrap(in);
// the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored
// in the schema-registry) before the serialized message
if (stuff.get() != 0x0) {
return;
}
int id = stuff.getInt();
// lets just ignore those special bytes
int length = stuff.limit() - 4 - 1;
int start = stuff.position() + stuff.arrayOffset();
Decoder decoder = DecoderFactory.get().binaryDecoder(
stuff.array(), start, length, null
);
try {
event ev = reader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
我发现我的解决方案很麻烦,所以我想知道是否有更简单的解决方案。
多亏了评论,我才能找到答案。秘诀是用 Properties
实例化 KafkaAvroDecoder
指定特定 Avro reader 的使用,即:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "...");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
VerifiableProp vProps = new VerifiableProperties(props);
KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
MyLittleData data = (MyLittleData) decoder.fromBytes(input);
相同的配置适用于直接使用 KafkaConsumer<K, V>
class 的情况(我在 Storm 中使用来自 storm-kafka 项目的 KafkaSpout
从 Kafka 消费,这使用 SimpleConsumer
,所以我必须手动反序列化消息。如果有勇气,可以使用 storm-kafka-client 项目,它使用新样式的消费者自动执行此操作)。