Flink 将映射的 Row 解释为单个 RAW
Flink interprets a mapped Row as a single RAW
我能够将静态行汇入数据库:
DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");
但是像这样将无界流映射到行:
DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());
尝试插入数据库时,输入和接收器架构不匹配时引发错误。
查询架构:[f0: RAW('org.apache.flink.types.Row', '...')]
相同的代码适用于 POJO 和元组,但我有超过 25 列,并且 POJO 没有任何其他用途 - 所以我希望它可以被通用字段序列替换(Row 声称成为)。
如何使用 Row 输入到数据库中?给出的示例仅显示它用于静态数据流和数据库输出。
我认为如果将其更改为这样的内容(当然是在调整列名称和类型之后),效果会更好:
DataStream<Row> kafkaRows = kafkaEvents
.map(new MyKafkaRecordToRowMapper())
.returns(Types.ROW_NAMED(
new String[] {"id", "quota", "ts", ...},
Types.STRING,
Types.LONG,
TypeInformation.of(Instant.class)),
...);
我能够将静态行汇入数据库:
DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");
但是像这样将无界流映射到行:
DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());
尝试插入数据库时,输入和接收器架构不匹配时引发错误。 查询架构:[f0: RAW('org.apache.flink.types.Row', '...')]
相同的代码适用于 POJO 和元组,但我有超过 25 列,并且 POJO 没有任何其他用途 - 所以我希望它可以被通用字段序列替换(Row 声称成为)。 如何使用 Row 输入到数据库中?给出的示例仅显示它用于静态数据流和数据库输出。
我认为如果将其更改为这样的内容(当然是在调整列名称和类型之后),效果会更好:
DataStream<Row> kafkaRows = kafkaEvents
.map(new MyKafkaRecordToRowMapper())
.returns(Types.ROW_NAMED(
new String[] {"id", "quota", "ts", ...},
Types.STRING,
Types.LONG,
TypeInformation.of(Instant.class)),
...);