使用 DynamicTableSink 时如何将 RowData 转换为 Row

How to convert RowData into Row when using DynamicTableSink

我对 Flink 中新的 sourceSinks 接口有疑问。我目前实现了一个新的自定义 DynamicTableSinkFactory、DynamicTableSink、SinkFunction 和 OutputFormat。我以 JDBC 连接器为例,并使用 Scala。

输入接收器的所有数据都具有行类型。所以OutputFormat序列化是基于Row Interface:

override def writeRecord(record: Row): Unit = {...}

如文档中所述:

records must be accepted as org.apache.flink.table.data.RowData. The framework provides runtime converters such that a sink can still work on common data structures and perform a conversion at the beginning.

这里的目标是保留Row数据结构,只有在插入到SinkFunction中时才将Row转换为RowData。所以这样剩下的代码就不用改了

class MySinkFunction(outputFormat: MyOutputFormat) extends RichSinkFunction[RowData] with CheckpointedFunction 

所以由此产生的问题是:使用DynamicTableSink和OutputFormat时如何将RowData转换为Row?应该在哪里进行转换?

链接: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc

谢谢。

您可以在org.apache.flink.table.connector.sink.DynamicTableSink#getSinkRuntimeProvider中提供的Context中获取转换器实例。

    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter =
            context.createDataStructureConverter(producedDataType);

该实例是 Java 可序列化的,可以传递到接收器函数中。您还应该在接收器函数中调用 converter.open() 方法。

可以找到一个更复杂的示例 here(对于源,但接收器以类似的方式工作)。查看同一包中的 SocketDynamicTableSourceChangelogCsvFormat