如何将模式附加到 Flink DataStream - 即时?
How to attach schema to a Flink DataStream - on the fly?
我正在处理数据库变更流,即更改日志流。我希望能够使用 SQL 查询来转换值。
我很难将以下三个概念放在一起
RowTypeInfo
、Row
和 DataStream
。
注意:我事先不知道架构。我使用 Mutation
对象中的数据即时构建它(Mutation
是自定义类型)
更具体地说,我有这样的代码。
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
注意:Row
对象是位置对象,没有列名的占位符。
我找不到将架构附加到 DataStream
对象的位置。
我想传递某种类似于 Row
的结构,其中包含查询的完整信息 {columnName: String, columnValue: Object, columnType: TypeInformation[_]}
。
在 Flink SQL 中,当定义 Table
时,table 模式是强制性的。无法 运行 查询动态类型的记录。
关于RowTypeInfo
、Row
和DataStream
的概念:
Row
是保存数据的实际记录
RowTypeInfo
是 Row
的模式描述。它包含 Row
. 的每个字段的名称和 TypeInformation
DataStream
是一个逻辑记录流。 DataStream[Row]
是行流。请注意,这不是实际的流,而只是代表 API. 中的流的 API 概念
我正在处理数据库变更流,即更改日志流。我希望能够使用 SQL 查询来转换值。
我很难将以下三个概念放在一起
RowTypeInfo
、Row
和 DataStream
。
注意:我事先不知道架构。我使用 Mutation
对象中的数据即时构建它(Mutation
是自定义类型)
更具体地说,我有这样的代码。
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
注意:Row
对象是位置对象,没有列名的占位符。
我找不到将架构附加到 DataStream
对象的位置。
我想传递某种类似于 Row
的结构,其中包含查询的完整信息 {columnName: String, columnValue: Object, columnType: TypeInformation[_]}
。
在 Flink SQL 中,当定义 Table
时,table 模式是强制性的。无法 运行 查询动态类型的记录。
关于RowTypeInfo
、Row
和DataStream
的概念:
Row
是保存数据的实际记录RowTypeInfo
是Row
的模式描述。它包含Row
. 的每个字段的名称和 DataStream
是一个逻辑记录流。DataStream[Row]
是行流。请注意,这不是实际的流,而只是代表 API. 中的流的 API 概念
TypeInformation