使用 table API 时如何将 java LocalDateTime 映射到 Flink TIMESTAMP
How to map java LocalDateTime to Flink TIMESTAMP when using table API
我的代码是这样的:
DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
tableEnv.createTemporaryView("input_table", src, $("name"), $("dt"));
然后我在尝试调用 date_format 后意识到字段 dt 不是 TIMESTAMP。
然后我更新了代码:
DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
RowTypeInfo rti = new RowTypeInfo(new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LOCAL_DATE_TIME}, new String[] {"name", "dt"});
SingleOutputStreamOpertor<Row> rows = src.map(val -> Row.of(val.f0, val.f1)).returns(rti);
tableEnv.createTemporaryView("input_table", rows);
新代码工作正常,但对我来说似乎有点扭曲,因为我必须添加一个基本不执行任何操作的地图。
所以我的问题是,将 java LocalDateTime 映射到 flink TIMESTAMP 的正确方法是什么?
我正在使用 Flink 1.13.0。
将 DataStream
转换为 Table
时,我们有机会指定 org.apache.flink.table.api.Schema
来调整 java 类型和 SQL 类型之间的映射,以及声明元数据,如水印。
这段代码适用于我的情况:
import java.time.LocalDateTime;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<String, LocalDateTime>> dataStream = env.fromElements(
Tuple2.of("Alice", LocalDateTime.parse("2021-11-16T08:19:30.123")),
Tuple2.of("Bob", LocalDateTime.parse("2021-11-16T08:19:31.123")),
Tuple2.of("John", LocalDateTime.parse("2021-11-16T08:19:32.123")));
// note that "f0" and "f1" here come from the field names in Tuple2
Table inputTable = tableEnv.fromDataStream(dataStream,
Schema.newBuilder()
.column("f0", "STRING")
.column("f1", "TIMESTAMP(3)")
.watermark("f1", "SOURCE_WATERMARK()")
.build()
);
tableEnv.createTemporaryView("input_table", inputTable);
tableEnv.executeSql("DESCRIBE input_table").print();
tableEnv.executeSql("" +
" SELECT " +
" UPPER(f0) AS name, " +
" f1 AS datetime, " +
" date_format(f1, 'YYYY') AS event_year " +
" FROM input_table "
).print();
打印:
+------+------------------------+------+-----+--------+--------------------+
| name | type | null | key | extras | watermark |
+------+------------------------+------+-----+--------+--------------------+
| f0 | STRING | true | | | |
| f1 | TIMESTAMP(3) *ROWTIME* | true | | | SOURCE_WATERMARK() |
+------+------------------------+------+-----+--------+--------------------+
和
+----+--------------------------------+-------------------------+--------------------------------+
| op | name | datetime | event_year |
+----+--------------------------------+-------------------------+--------------------------------+
| +I | ALICE | 2021-11-16 08:19:30.123 | 2021 |
| +I | BOB | 2021-11-16 08:19:31.123 | 2021 |
| +I | JOHN | 2021-11-16 08:19:32.123 | 2021 |
+----+--------------------------------+-------------------------+--------------------------------+
我发现 DESCRIBE
调试这些案例非常方便。
请注意,DataStream
和 Table
之间的转换在 1.13 中得到了改进,语法也发生了一些变化。这就是他们在该文档的“遗留”部分中提到的内容。您可能会在较早的 SO 帖子中偶然发现该遗留语法的示例。
检查 java 类型和此处描述的 SQL 类型之间的对应关系可能也有帮助:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type
我的代码是这样的:
DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
tableEnv.createTemporaryView("input_table", src, $("name"), $("dt"));
然后我在尝试调用 date_format 后意识到字段 dt 不是 TIMESTAMP。
然后我更新了代码:
DataStreamSource<Tuple2<String, LocalDateTime>> src = ...;
RowTypeInfo rti = new RowTypeInfo(new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LOCAL_DATE_TIME}, new String[] {"name", "dt"});
SingleOutputStreamOpertor<Row> rows = src.map(val -> Row.of(val.f0, val.f1)).returns(rti);
tableEnv.createTemporaryView("input_table", rows);
新代码工作正常,但对我来说似乎有点扭曲,因为我必须添加一个基本不执行任何操作的地图。
所以我的问题是,将 java LocalDateTime 映射到 flink TIMESTAMP 的正确方法是什么?
我正在使用 Flink 1.13.0。
将 DataStream
转换为 Table
时,我们有机会指定 org.apache.flink.table.api.Schema
来调整 java 类型和 SQL 类型之间的映射,以及声明元数据,如水印。
这段代码适用于我的情况:
import java.time.LocalDateTime;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<String, LocalDateTime>> dataStream = env.fromElements(
Tuple2.of("Alice", LocalDateTime.parse("2021-11-16T08:19:30.123")),
Tuple2.of("Bob", LocalDateTime.parse("2021-11-16T08:19:31.123")),
Tuple2.of("John", LocalDateTime.parse("2021-11-16T08:19:32.123")));
// note that "f0" and "f1" here come from the field names in Tuple2
Table inputTable = tableEnv.fromDataStream(dataStream,
Schema.newBuilder()
.column("f0", "STRING")
.column("f1", "TIMESTAMP(3)")
.watermark("f1", "SOURCE_WATERMARK()")
.build()
);
tableEnv.createTemporaryView("input_table", inputTable);
tableEnv.executeSql("DESCRIBE input_table").print();
tableEnv.executeSql("" +
" SELECT " +
" UPPER(f0) AS name, " +
" f1 AS datetime, " +
" date_format(f1, 'YYYY') AS event_year " +
" FROM input_table "
).print();
打印:
+------+------------------------+------+-----+--------+--------------------+
| name | type | null | key | extras | watermark |
+------+------------------------+------+-----+--------+--------------------+
| f0 | STRING | true | | | |
| f1 | TIMESTAMP(3) *ROWTIME* | true | | | SOURCE_WATERMARK() |
+------+------------------------+------+-----+--------+--------------------+
和
+----+--------------------------------+-------------------------+--------------------------------+
| op | name | datetime | event_year |
+----+--------------------------------+-------------------------+--------------------------------+
| +I | ALICE | 2021-11-16 08:19:30.123 | 2021 |
| +I | BOB | 2021-11-16 08:19:31.123 | 2021 |
| +I | JOHN | 2021-11-16 08:19:32.123 | 2021 |
+----+--------------------------------+-------------------------+--------------------------------+
我发现 DESCRIBE
调试这些案例非常方便。
请注意,DataStream
和 Table
之间的转换在 1.13 中得到了改进,语法也发生了一些变化。这就是他们在该文档的“遗留”部分中提到的内容。您可能会在较早的 SO 帖子中偶然发现该遗留语法的示例。
检查 java 类型和此处描述的 SQL 类型之间的对应关系可能也有帮助:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type