如何在 Flink Operator 中使用 Avro Generated 类 with children 作为 UNION
How to use Avro Generated Classes with children as UNION in Flink Operator
我正在尝试构建 Avro 生成记录的列表,同时对从 Kafka 源中提取的 Kafka 记录进行迭代 .map()
。
我遇到的问题是我必须在该 Kafka 主题上处理多种类型的事件,所以我最终得到了一个 GenricType(schema/avro 生成的 object) UNION 字段 ('data').
在处理这些记录并尝试构建结果时,我进行了调试并在 PojoType 验证阶段结束,并且由于 class GenericType
声明了 child作为 UNION,该字段变为:private java.lang.Object data;
在处理该字段时,在 PojoType 验证器中,抛出异常:
Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
我的 GenericType
java class (已生成,确实显式扩展 com.avro.specific.SpecificRecordBase
但问题仍然存在,因为它的字段类型为 java.lang.Object
.
这是导致问题的代码:
SingleOutputStreamOperator<GenericType> producedRecords =
kafkaRecords
.map(
value -> {
String kafkaKey = value.get(KEY).asText();
String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
return (GenericType) Converter.convert(kafkaKey, kafkaRecordJson);
})
.returns(
TypeInformation.of(
new TypeHint<>() {
@Override
public TypeInformation<GenericType> getTypeInfo() {
return super.getTypeInfo();
}
}));
Avro 模式:
{
"type": "record",
"name": "GenericType",
"namespace": "com.zzz.yyy",
"fields": [
{
"name": "data",
"type": [
"com.zzz.yyy.Schema1",
"com.zzz.yyy.Schema2"
]
}
]
}
我也试过这样的 Avro 模式:
[
"com.zzz.yyy.Schema1",
"com.zzz.yyy.Schema2"
]
所以这只是通用类型 object 的 UNION,但我无法使生成 object 的 avro 插件真正起作用。总是声明架构无效。
此架构将生成如下所示的 java object(显然清除了 avro 添加的样板代码)- 值得一提的是下面的 class,确实
扩展 SpecificRecordBase - 只是为了排除这是异常的问题。
public class GenericType {
// boiler plate here
private java.lang.Object data;
// boiler plate here
}
这就是实际问题,像我说的那样进行调试时,在验证 object 中的字段时,'data' 字段并不好,因为它不是原始类型或 POJO类型(作为 object),它不遵守某些规则(必须有无参数构造函数、getter、setter 等)
试图弄清楚我如何生成那些 Avro Object,或者我可以使用什么来代替我工作中的通用 Avro,这样我就可以继续处理那个异常 - 老实说,通过观察在那里验证,我不确定这怎么可能,因为同样,Avro 插件将始终生成一个字段作为 UNION 的 java.lang.Object
。
更多上下文:
- 使用 schema-registry 注册的 Avro 模式。
- 生成的 Avro objects 发送到 kafka 接收器。
这只是一个愚蠢的问题,用于生成 avro 类 的 mvn 插件设置了一个标志以不创建 setter。添加之后,所有验证都通过了 Avro Pojos,因此流程成功。
我正在尝试构建 Avro 生成记录的列表,同时对从 Kafka 源中提取的 Kafka 记录进行迭代 .map()
。
我遇到的问题是我必须在该 Kafka 主题上处理多种类型的事件,所以我最终得到了一个 GenricType(schema/avro 生成的 object) UNION 字段 ('data').
在处理这些记录并尝试构建结果时,我进行了调试并在 PojoType 验证阶段结束,并且由于 class GenericType
声明了 child作为 UNION,该字段变为:private java.lang.Object data;
在处理该字段时,在 PojoType 验证器中,抛出异常:
Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
我的 GenericType
java class (已生成,确实显式扩展 com.avro.specific.SpecificRecordBase
但问题仍然存在,因为它的字段类型为 java.lang.Object
.
这是导致问题的代码:
SingleOutputStreamOperator<GenericType> producedRecords =
kafkaRecords
.map(
value -> {
String kafkaKey = value.get(KEY).asText();
String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
return (GenericType) Converter.convert(kafkaKey, kafkaRecordJson);
})
.returns(
TypeInformation.of(
new TypeHint<>() {
@Override
public TypeInformation<GenericType> getTypeInfo() {
return super.getTypeInfo();
}
}));
Avro 模式:
{
"type": "record",
"name": "GenericType",
"namespace": "com.zzz.yyy",
"fields": [
{
"name": "data",
"type": [
"com.zzz.yyy.Schema1",
"com.zzz.yyy.Schema2"
]
}
]
}
我也试过这样的 Avro 模式:
[
"com.zzz.yyy.Schema1",
"com.zzz.yyy.Schema2"
]
所以这只是通用类型 object 的 UNION,但我无法使生成 object 的 avro 插件真正起作用。总是声明架构无效。
此架构将生成如下所示的 java object(显然清除了 avro 添加的样板代码)- 值得一提的是下面的 class,确实 扩展 SpecificRecordBase - 只是为了排除这是异常的问题。
public class GenericType {
// boiler plate here
private java.lang.Object data;
// boiler plate here
}
这就是实际问题,像我说的那样进行调试时,在验证 object 中的字段时,'data' 字段并不好,因为它不是原始类型或 POJO类型(作为 object),它不遵守某些规则(必须有无参数构造函数、getter、setter 等)
试图弄清楚我如何生成那些 Avro Object,或者我可以使用什么来代替我工作中的通用 Avro,这样我就可以继续处理那个异常 - 老实说,通过观察在那里验证,我不确定这怎么可能,因为同样,Avro 插件将始终生成一个字段作为 UNION 的 java.lang.Object
。
更多上下文:
- 使用 schema-registry 注册的 Avro 模式。
- 生成的 Avro objects 发送到 kafka 接收器。
这只是一个愚蠢的问题,用于生成 avro 类 的 mvn 插件设置了一个标志以不创建 setter。添加之后,所有验证都通过了 Avro Pojos,因此流程成功。