如何在 Flink SQL 中反序列化 Avro 枚举类型?
How to deserialize Avro enum type in Flink SQL?
我有一个具有以下 Avro IDL 的 Kafka 主题并注册到
架构注册表。
@namespace("my.type.avro")
protocol MyProtocol {
enum MyEnumType {
Type1, Type2
}
record MyEntry {
MyEnumType myEntryType = "Type1";
}
record MyRecord {
MyEntry entry;
}
}
为了阅读主题,我定义了以下 DDL:
CREATE TABLE my_table
(
`entry` ROW(`myEntryType` ROW(???))
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = '...:9092',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://...:8081'
)
我运行下面的查询:
SELECT * FROM my_table
现在我在 Flink-1.13.1 中使用 STRING 时收到以下消息
类型:
*Caused by: java.io.IOException: Failed to deserialize Avro record.*
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
*Caused by: org.apache.avro.AvroTypeException: Found
my.type.avro.MyEnumType, expecting union*
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:1
...
我试过 *RAW('class','snapshot')
,其中 'class' 是 my.type.avro.MyEnumType
,但我找不到合适的快照序列化程序。已经尝试了很多,例如PojoSerializerSnapshot、KryoSerializer.KryoSerializerConfigSnapshot、StringSerializer、AvroSerializer 等,none 有效。
是的,当前的解决方法是使用 DataStream API 读取数据并
提供您的自定义 Avro 模式来配置格式。然后切换到 Table API,如以下线程所述:https://www.mail-archive.com/user@flink.apache.org/msg44520.html
此外,为了通过 Table API 进一步支持枚举,打开了以下 jira 票证:
https://issues.apache.org/jira/browse/FLINK-24544
我有一个具有以下 Avro IDL 的 Kafka 主题并注册到 架构注册表。
@namespace("my.type.avro")
protocol MyProtocol {
enum MyEnumType {
Type1, Type2
}
record MyEntry {
MyEnumType myEntryType = "Type1";
}
record MyRecord {
MyEntry entry;
}
}
为了阅读主题,我定义了以下 DDL:
CREATE TABLE my_table
(
`entry` ROW(`myEntryType` ROW(???))
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = '...:9092',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://...:8081'
)
我运行下面的查询:
SELECT * FROM my_table
现在我在 Flink-1.13.1 中使用 STRING 时收到以下消息 类型:
*Caused by: java.io.IOException: Failed to deserialize Avro record.*
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
*Caused by: org.apache.avro.AvroTypeException: Found
my.type.avro.MyEnumType, expecting union*
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:1
...
我试过 *RAW('class','snapshot')
,其中 'class' 是 my.type.avro.MyEnumType
,但我找不到合适的快照序列化程序。已经尝试了很多,例如PojoSerializerSnapshot、KryoSerializer.KryoSerializerConfigSnapshot、StringSerializer、AvroSerializer 等,none 有效。
是的,当前的解决方法是使用 DataStream API 读取数据并 提供您的自定义 Avro 模式来配置格式。然后切换到 Table API,如以下线程所述:https://www.mail-archive.com/user@flink.apache.org/msg44520.html
此外,为了通过 Table API 进一步支持枚举,打开了以下 jira 票证: https://issues.apache.org/jira/browse/FLINK-24544