如何在 Flink SQL 中反序列化 Avro 枚举类型?

How to deserialize Avro enum type in Flink SQL?

我有一个具有以下 Avro IDL 的 Kafka 主题并注册到 架构注册表。

    @namespace("my.type.avro")
    protocol MyProtocol {
      enum MyEnumType {
       Type1, Type2
      }

      record MyEntry {
         MyEnumType myEntryType = "Type1";
      }

      record MyRecord {
          MyEntry entry;
      }
    }

为了阅读主题,我定义了以下 DDL:

    CREATE TABLE my_table

    (
      `entry` ROW(`myEntryType` ROW(???))
     ) WITH (
         'connector' = 'kafka',
         'topic' = 'my-topic',
         'properties.bootstrap.servers' = '...:9092',
         'scan.startup.mode' = 'latest-offset',
         'value.format' = 'avro-confluent',
         'value.avro-confluent.schema-registry.url' = 'http://...:8081'
    )

我运行下面的查询:

    SELECT * FROM my_table

现在我在 Flink-1.13.1 中使用 STRING 时收到以下消息 类型:

     *Caused by: java.io.IOException: Failed to deserialize Avro record.*
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
       at
     org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
       at
     org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
       at
     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
       at
     org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
     *Caused by: org.apache.avro.AvroTypeException: Found
     my.type.avro.MyEnumType, expecting union*
       at
     org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
       at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
       at
     org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
       at
     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:1
     ...

我试过 *RAW('class','snapshot'),其中 'class' 是 my.type.avro.MyEnumType,但我找不到合适的快照序列化程序。已经尝试了很多,例如PojoSerializerSnapshot、KryoSerializer.KryoSerializerConfigSnapshot、StringSerializer、AvroSerializer 等,none 有效。

是的,当前的解决方法是使用 DataStream API 读取数据并 提供您的自定义 Avro 模式来配置格式。然后切换到 Table API,如以下线程所述:https://www.mail-archive.com/user@flink.apache.org/msg44520.html

此外,为了通过 Table API 进一步支持枚举,打开了以下 jira 票证: https://issues.apache.org/jira/browse/FLINK-24544