如何在 Flink Operator 中使用 Avro Generated 类 with children 作为 UNION

How to use Avro Generated Classes with children as UNION in Flink Operator

我正在尝试构建 Avro 生成记录的列表,同时对从 Kafka 源中提取的 Kafka 记录进行迭代 .map()

我遇到的问题是我必须在该 Kafka 主题上处理多种类型的事件,所以我最终得到了一个 GenricType(schema/avro 生成的 object) UNION 字段 ('data').

在处理这些记录并尝试构建结果时,我进行了调试并在 PojoType 验证阶段结束,并且由于 class GenericType 声明了 child作为 UNION,该字段变为:private java.lang.Object data;

在处理该字段时,在 PojoType 验证器中,抛出异常:

Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo

我的 GenericType java class (已生成,确实显式扩展 com.avro.specific.SpecificRecordBase 但问题仍然存在,因为它的字段类型为 java.lang.Object.

这是导致问题的代码:

SingleOutputStreamOperator<GenericType> producedRecords =
      kafkaRecords
          .map(
              value -> {
                String kafkaKey = value.get(KEY).asText();
                String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
                return (GenericType) Converter.convert(kafkaKey, kafkaRecordJson);
              })
          .returns(
              TypeInformation.of(
                  new TypeHint<>() {
                    @Override
                    public TypeInformation<GenericType> getTypeInfo() {
                      return super.getTypeInfo();
                    }
          }));

Avro 模式:

{
  "type": "record",
  "name": "GenericType",
  "namespace": "com.zzz.yyy",
  "fields": [
    {
      "name": "data",
      "type": [
        "com.zzz.yyy.Schema1",
        "com.zzz.yyy.Schema2"
      ]
    }
  ]
}

我也试过这样的 Avro 模式:

[
    "com.zzz.yyy.Schema1",
    "com.zzz.yyy.Schema2"
]

所以这只是通用类型 object 的 UNION,但我无法使生成 object 的 avro 插件真正起作用。总是声明架构无效。

此架构将生成如下所示的 java object(显然清除了 avro 添加的样板代码)- 值得一提的是下面的 class,确实 扩展 SpecificRecordBase - 只是为了排除这是异常的问题。

public class GenericType {
    // boiler plate here
    private java.lang.Object data;
    // boiler plate here
}

这就是实际问题,像我说的那样进行调试时,在验证 object 中的字段时,'data' 字段并不好,因为它不是原始类型或 POJO类型(作为 object),它不遵守某些规则(必须有无参数构造函数、getter、setter 等)

试图弄清楚我如何生成那些 Avro Object,或者我可以使用什么来代替我工作中的通用 Avro,这样我就可以继续处理那个异常 - 老实说,通过观察在那里验证,我不确定这怎么可能,因为同样,Avro 插件将始终生成一个字段作为 UNION 的 java.lang.Object

更多上下文:

这只是一个愚蠢的问题,用于生成 avro 类 的 mvn 插件设置了一个标志以不创建 setter。添加之后,所有验证都通过了 Avro Pojos,因此流程成功。