是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluentRegistryAvroDeserializationSchema 中给出 Reader 模式
Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema
我在 Apache Flink 中使用 Kafka 连接器来访问 Confluent Kafka 提供的流。
除了模式注册表 url ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
需要 'reader' 模式。
我不想提供读取模式,而是想使用同一作者的模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式。
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
"Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided"
由于我不想在消费者端保留模式定义,我该如何使用作者的模式反序列化来自 Kafka 的 Avro 消息?
感谢您的帮助!
我觉得直接用是不行的ConfluentRegistryAvroDeserializationSchema.forGeneric
。它旨在与 reader 模式一起使用,并且他们有先决条件检查这个。
您必须自己实施。两个重要的东西:
- 将
specific.avro.reader
设置为 false(否则您将获得特定记录)
KafkaAvroDeserializer
必须延迟初始化(因为它本身不可序列化,因为它持有对架构注册表客户端的引用)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class KafkaGenericAvroDeserializationSchema
implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
env.addSource(
new FlinkKafkaConsumer<>(
topic,
new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl),
kafkaProperties));
我在 Apache Flink 中使用 Kafka 连接器来访问 Confluent Kafka 提供的流。
除了模式注册表 url ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
需要 'reader' 模式。
我不想提供读取模式,而是想使用同一作者的模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式。
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html "Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided"
由于我不想在消费者端保留模式定义,我该如何使用作者的模式反序列化来自 Kafka 的 Avro 消息?
感谢您的帮助!
我觉得直接用是不行的ConfluentRegistryAvroDeserializationSchema.forGeneric
。它旨在与 reader 模式一起使用,并且他们有先决条件检查这个。
您必须自己实施。两个重要的东西:
- 将
specific.avro.reader
设置为 false(否则您将获得特定记录) KafkaAvroDeserializer
必须延迟初始化(因为它本身不可序列化,因为它持有对架构注册表客户端的引用)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class KafkaGenericAvroDeserializationSchema
implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
env.addSource(
new FlinkKafkaConsumer<>(
topic,
new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl),
kafkaProperties));