使用 table API 时如何将 java LocalDateTime 映射到 Flink TIMESTAMP

How to map java LocalDateTime to Flink TIMESTAMP when using table API

我的代码是这样的:

DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
tableEnv.createTemporaryView("input_table", src, $("name"), $("dt"));

然后我在尝试调用 date_format 后意识到字段 dt 不是 TIMESTAMP。

然后我更新了代码:

DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
RowTypeInfo rti = new RowTypeInfo(new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LOCAL_DATE_TIME}, new String[] {"name", "dt"});
SingleOutputStreamOpertor<Row> rows = src.map(val -> Row.of(val.f0, val.f1)).returns(rti);
tableEnv.createTemporaryView("input_table", rows);

新代码工作正常,但对我来说似乎有点扭曲,因为我必须添加一个基本不执行任何操作的地图。

所以我的问题是,将 java LocalDateTime 映射到 flink TIMESTAMP 的正确方法是什么?

我正在使用 Flink 1.13.0。

DataStream 转换为 Table 时,我们有机会指定 org.apache.flink.table.api.Schema 来调整 java 类型和 SQL 类型之间的映射,以及声明元数据,如水印。

这段代码适用于我的情况:

import java.time.LocalDateTime;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

...

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    DataStream<Tuple2<String, LocalDateTime>> dataStream = env.fromElements(
        Tuple2.of("Alice", LocalDateTime.parse("2021-11-16T08:19:30.123")),
        Tuple2.of("Bob", LocalDateTime.parse("2021-11-16T08:19:31.123")),
        Tuple2.of("John", LocalDateTime.parse("2021-11-16T08:19:32.123")));

    // note that "f0" and "f1" here come from the field names in Tuple2
    Table inputTable = tableEnv.fromDataStream(dataStream,
        Schema.newBuilder()
            .column("f0", "STRING")
            .column("f1", "TIMESTAMP(3)")
            .watermark("f1", "SOURCE_WATERMARK()")
            .build()
    );
    tableEnv.createTemporaryView("input_table", inputTable);

    tableEnv.executeSql("DESCRIBE input_table").print();

    tableEnv.executeSql("" +
        "  SELECT                                      " +
        "    UPPER(f0) AS name,                        " +
        "    f1 AS datetime,                           " +
        "    date_format(f1, 'YYYY') AS event_year     " +
        "  FROM input_table                            "
    ).print();

打印:

+------+------------------------+------+-----+--------+--------------------+
| name |                   type | null | key | extras |          watermark |
+------+------------------------+------+-----+--------+--------------------+
|   f0 |                 STRING | true |     |        |                    |
|   f1 | TIMESTAMP(3) *ROWTIME* | true |     |        | SOURCE_WATERMARK() |
+------+------------------------+------+-----+--------+--------------------+

+----+--------------------------------+-------------------------+--------------------------------+
| op |                           name |                datetime |                     event_year |
+----+--------------------------------+-------------------------+--------------------------------+
| +I |                          ALICE | 2021-11-16 08:19:30.123 |                           2021 |
| +I |                            BOB | 2021-11-16 08:19:31.123 |                           2021 |
| +I |                           JOHN | 2021-11-16 08:19:32.123 |                           2021 |
+----+--------------------------------+-------------------------+--------------------------------+

我发现 DESCRIBE 调试这些案例非常方便。

另请参阅此处了解更多详情:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

请注意,DataStreamTable 之间的转换在 1.13 中得到了改进,语法也发生了一些变化。这就是他们在该文档的“遗留”部分中提到的内容。您可能会在较早的 SO 帖子中偶然发现该遗留语法的示例。

检查 java 类型和此处描述的 SQL 类型之间的对应关系可能也有帮助:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type