Apache Flink:提取元组的类型信息
Apache Flink : Extract TypeInformation of Tuple
我正在使用 FlinkKafkaConsumer09,其中我有一个实现 KeyedDeserializationSchema> 的 ByteArrayDeseializationSchema,现在在 getProducedType 中我如何提取 TypeInformation。
我在文档中读到 TypeExtractor.getForClass 方法不支持 ParameterizedTypes ,我应该使用 TypeExtractor 的哪个方法来实现这个?
我想我们必须使用 createTypeInfo 方法,你能告诉我如何使用它来 return TypeInformation 吗?
如果反序列化模式的 returned 类型是 byte[]
,那么您可以使用 PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
作为 getProducedType
的 return 值。
如果你想要 TypeInformation
的 Tuple2<byte[], byte[]>
:
return TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(byte[].class, byte[].class)
可能吗?
在 Java API
中键入提示
为了帮助 Flink 无法重建已擦除的泛型类型信息的情况,Java API 从 0.9 版本开始提供所谓的类型提示。类型提示告诉系统函数生成的数据集的类型。下面举个例子:
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
returns 语句指定生成的类型,在本例中是通过 class。提示通过
支持类型定义
Classes, for non-parameterized types (no generics)
Strings in the form of returns("Tuple2<Integer, my.SomeType>"), which are parsed and converted to a TypeInformation.
A TypeInformation directly
你可以试试这个:
case class CaseClassForYourExpectedMessage(key:String,value:String)
/* getProducedType method */
override def getProducedType: TypeInformation[CaseClassForYourExpectedMessage] =
TypeExtractor.getForClass(classOf[CaseClassForYourExpectedMessage])
我正在使用 FlinkKafkaConsumer09,其中我有一个实现 KeyedDeserializationSchema> 的 ByteArrayDeseializationSchema,现在在 getProducedType 中我如何提取 TypeInformation。
我在文档中读到 TypeExtractor.getForClass 方法不支持 ParameterizedTypes ,我应该使用 TypeExtractor 的哪个方法来实现这个?
我想我们必须使用 createTypeInfo 方法,你能告诉我如何使用它来 return TypeInformation 吗?
如果反序列化模式的 returned 类型是 byte[]
,那么您可以使用 PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
作为 getProducedType
的 return 值。
如果你想要 TypeInformation
的 Tuple2<byte[], byte[]>
:
return TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(byte[].class, byte[].class)
可能吗?
在 Java API
中键入提示为了帮助 Flink 无法重建已擦除的泛型类型信息的情况,Java API 从 0.9 版本开始提供所谓的类型提示。类型提示告诉系统函数生成的数据集的类型。下面举个例子:
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
returns 语句指定生成的类型,在本例中是通过 class。提示通过
支持类型定义Classes, for non-parameterized types (no generics)
Strings in the form of returns("Tuple2<Integer, my.SomeType>"), which are parsed and converted to a TypeInformation.
A TypeInformation directly
你可以试试这个:
case class CaseClassForYourExpectedMessage(key:String,value:String)
/* getProducedType method */
override def getProducedType: TypeInformation[CaseClassForYourExpectedMessage] =
TypeExtractor.getForClass(classOf[CaseClassForYourExpectedMessage])