Flink SQL 单元测试:如何分配水印?
Flink SQL Unit Testing: How to Assign Watermark?
我正在为使用 match_recognize 的 Flink SQL 语句编写单元测试。我正在像这样设置测试数据
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
我有两个问题,
- 如何将event_time指定为加水印的字段? (表示行时间)
- 不太重要,给table创建一个有意义的名字?
FLINK 版本:1.11
您遇到了 Table API 的当前限制:无法结合 forValues
方法定义水印和行时间属性;你需要一个连接器。有几个选项可以解决它:
1. 使用与 VALUES
叠加的 csv
连接器,如 this example.[=17= 所示]
2.使用built-inDataGen connector。由于您正在为 CEP 进行单元测试,我想您希望对生成的数据进行某种程度的控制,因此这可能不是一个可行的选择。反正我想提一下。
注意: 使用 SQL DDL 语法是从 Flink 1.10 创建 table 的推荐方法。这将使您尝试做的两件事(即定义水印并命名您的table)更加直接:
tEnv.executeSql("CREATE TABLE table_name (\n" +
" event_time TIMESTAMP(3),\n" +
" foobar STRING \n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (...)"
);
Table data = tEnv.from("table_name");
水印声明为计算列,您可以选择使用多种水印策略。请检查 this documentation page 了解更多详情。
我正在为使用 match_recognize 的 Flink SQL 语句编写单元测试。我正在像这样设置测试数据
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
我有两个问题,
- 如何将event_time指定为加水印的字段? (表示行时间)
- 不太重要,给table创建一个有意义的名字?
FLINK 版本:1.11
您遇到了 Table API 的当前限制:无法结合 forValues
方法定义水印和行时间属性;你需要一个连接器。有几个选项可以解决它:
1. 使用与 VALUES
叠加的 csv
连接器,如 this example.[=17= 所示]
2.使用built-inDataGen connector。由于您正在为 CEP 进行单元测试,我想您希望对生成的数据进行某种程度的控制,因此这可能不是一个可行的选择。反正我想提一下。
注意: 使用 SQL DDL 语法是从 Flink 1.10 创建 table 的推荐方法。这将使您尝试做的两件事(即定义水印并命名您的table)更加直接:
tEnv.executeSql("CREATE TABLE table_name (\n" +
" event_time TIMESTAMP(3),\n" +
" foobar STRING \n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (...)"
);
Table data = tEnv.from("table_name");
水印声明为计算列,您可以选择使用多种水印策略。请检查 this documentation page 了解更多详情。