如何将模式附加到 Flink DataStream - 即时?

How to attach schema to a Flink DataStream - on the fly?

我正在处理数据库变更流,即更改日志流。我希望能够使用 SQL 查询来转换值。 我很难将以下三个概念放在一起 RowTypeInfoRowDataStream

注意:我事先不知道架构。我使用 Mutation 对象中的数据即时构建它(Mutation 是自定义类型)

更具体地说,我有这样的代码。

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)

// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

注意:Row 对象是位置对象,没有列名的占位符。 我找不到将架构附加到 DataStream 对象的位置。

我想传递某种类似于 Row 的结构,其中包含查询的完整信息 {columnName: String, columnValue: Object, columnType: TypeInformation[_]}

在 Flink SQL 中,当定义 Table 时,table 模式是强制性的。无法 运行 查询动态类型的记录。

关于RowTypeInfoRowDataStream的概念:

  • Row 是保存数据的实际记录
  • RowTypeInfoRow 的模式描述。它包含 Row.
  • 的每个字段的名称和 TypeInformation
  • DataStream 是一个逻辑记录流。 DataStream[Row] 是行流。请注意,这不是实际的流,而只是代表 API.
  • 中的流的 API 概念