Flink temporal join 只能工作几秒钟
Flink temporal join works only for a few seconds
我正在尝试在 Flink 中实现事件时间临时连接。这是第一个连接 table:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3)," +
"`area` STRING," +
"`networkEdge` STRING," +
"`vehiclesNumber` BIGINT," +
"`averageSpeed` INTEGER," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.trafficdata.aggregated'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'json'," +
"'json.timestamp-format.standard' = 'ISO-8601'" +
")");
table 用作以下查询的接收器:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
这是另一个加入 table。我使用 Debezium 将 Postgres table 流式传输到 Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
"`urn` STRING," +
"`flow_rate` INTEGER," +
"PRIMARY KEY(`urn`) NOT ENFORCED," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.network.transport_network_edge'," +
"'scan.startup.mode' = 'latest-offset'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include' = 'true'" +
")");
最后是临时连接:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
我遇到的问题是加入仅在前几秒有效(在 Postgres table 更新后),但我需要连续加入第一个 table debezium 一个。难道我做错了什么?
谢谢
euks
使用您正在使用的 AS OF 语法的临时连接要求:
- 具有有效事件时间属性的仅附加 table
- 使用主键和有效的事件时间属性更新table
- 主键上的相等谓词
当 Flink SQL 的时间运算符应用于事件时间流时,水印在确定何时产生结果以及何时清除状态方面起着至关重要的作用。
执行临时连接时:
- 来自仅附加 table 的行在 Flink 状态中被缓冲,直到连接运算符的当前水印达到它们的时间戳
- 对于版本化 table,对于每个密钥,其时间戳在连接运算符当前水印之前的最新版本以及当前水印之后的任何版本都保持状态
- 只要连接运算符的水印前进,就会产生新的结果,并且不再相关的状态会被清除
连接运算符跟踪它从输入通道接收到的水印,其当前水印始终是这两个水印中的最小值。这就是为什么您的加入停滞不前,并且只有在更新 flow_rate 时才会取得进展。
解决此问题的一种方法是像这样为 TransportNetworkEdge_Kafka table 设置水印:
"WATERMARK FOR `timestamp` AS " + Watermark.MAX_WATERMARK
这会将此 table/stream 的水印设置为最大可能值,这将使此流中的水印变得无关紧要 - 此流的水印永远不会是最小值。
但是,这会导致连接结果不确定。
我正在尝试在 Flink 中实现事件时间临时连接。这是第一个连接 table:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3)," +
"`area` STRING," +
"`networkEdge` STRING," +
"`vehiclesNumber` BIGINT," +
"`averageSpeed` INTEGER," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.trafficdata.aggregated'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'json'," +
"'json.timestamp-format.standard' = 'ISO-8601'" +
")");
table 用作以下查询的接收器:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
这是另一个加入 table。我使用 Debezium 将 Postgres table 流式传输到 Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
"`urn` STRING," +
"`flow_rate` INTEGER," +
"PRIMARY KEY(`urn`) NOT ENFORCED," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.network.transport_network_edge'," +
"'scan.startup.mode' = 'latest-offset'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include' = 'true'" +
")");
最后是临时连接:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
我遇到的问题是加入仅在前几秒有效(在 Postgres table 更新后),但我需要连续加入第一个 table debezium 一个。难道我做错了什么? 谢谢 euks
使用您正在使用的 AS OF 语法的临时连接要求:
- 具有有效事件时间属性的仅附加 table
- 使用主键和有效的事件时间属性更新table
- 主键上的相等谓词
当 Flink SQL 的时间运算符应用于事件时间流时,水印在确定何时产生结果以及何时清除状态方面起着至关重要的作用。
执行临时连接时:
- 来自仅附加 table 的行在 Flink 状态中被缓冲,直到连接运算符的当前水印达到它们的时间戳
- 对于版本化 table,对于每个密钥,其时间戳在连接运算符当前水印之前的最新版本以及当前水印之后的任何版本都保持状态
- 只要连接运算符的水印前进,就会产生新的结果,并且不再相关的状态会被清除
连接运算符跟踪它从输入通道接收到的水印,其当前水印始终是这两个水印中的最小值。这就是为什么您的加入停滞不前,并且只有在更新 flow_rate 时才会取得进展。
解决此问题的一种方法是像这样为 TransportNetworkEdge_Kafka table 设置水印:
"WATERMARK FOR `timestamp` AS " + Watermark.MAX_WATERMARK
这会将此 table/stream 的水印设置为最大可能值,这将使此流中的水印变得无关紧要 - 此流的水印永远不会是最小值。
但是,这会导致连接结果不确定。