Flink SQL 查询不返回结果
Flink SQL Query not returning results
我在 flink 作业中使用了以下 SQL 查询。 mysql_table
是使用 JDBC 连接器创建的,kafa_source
table 是从传入的 kafka 流创建的。
SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)
我正在两者之间执行临时连接,当我检查 Flink 的 sql-client CLI(使用 flink-faker
测试)时,这 运行 很好。内部查询工作得很好并且正在打印结果。谁能帮我确定查询的问题?
编辑:
我正在寻找像这样超过 5 分钟创建的 TUMBLE 事件的输出
+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)
mysql table 的架构是
+-----------------+--------------+
| Field | Type |
+-----------------+--------------+
| event_id | varchar(64) |
| event_source_id | varchar(255) |
| event_name | varchar(255) |
和kafka table的方案是
event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE
编辑:
我观察到 TUMBLE_END
与 PROCTIME()
列一起工作正常,但同样不适用于 event_time
。这是分别选择 proc_time
和 event_time
的查询的输出。
+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]
下面的查询是基于 kafka_source
table.
SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)
并给出以下类似的输出
+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]
当我使用 event_time
代替 proc_time
时,相同的查询没有返回任何结果。我在 table 中创建这些列是这样的:
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
其中 eventTime
只是来自 kafka 主题的传入时间戳值。两个字段的类型相同,TIMESTAMP_LTZ(3)
proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*
我在这里犯了什么错误?
我通过在 table 环境中添加一个设置来解决这个问题。
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
我可以在设置超时后立即创建 windows
我在 flink 作业中使用了以下 SQL 查询。 mysql_table
是使用 JDBC 连接器创建的,kafa_source
table 是从传入的 kafka 流创建的。
SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)
我正在两者之间执行临时连接,当我检查 Flink 的 sql-client CLI(使用 flink-faker
测试)时,这 运行 很好。内部查询工作得很好并且正在打印结果。谁能帮我确定查询的问题?
编辑: 我正在寻找像这样超过 5 分钟创建的 TUMBLE 事件的输出
+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)
mysql table 的架构是
+-----------------+--------------+
| Field | Type |
+-----------------+--------------+
| event_id | varchar(64) |
| event_source_id | varchar(255) |
| event_name | varchar(255) |
和kafka table的方案是
event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE
编辑:
我观察到 TUMBLE_END
与 PROCTIME()
列一起工作正常,但同样不适用于 event_time
。这是分别选择 proc_time
和 event_time
的查询的输出。
+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]
下面的查询是基于 kafka_source
table.
SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)
并给出以下类似的输出
+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]
当我使用 event_time
代替 proc_time
时,相同的查询没有返回任何结果。我在 table 中创建这些列是这样的:
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
其中 eventTime
只是来自 kafka 主题的传入时间戳值。两个字段的类型相同,TIMESTAMP_LTZ(3)
proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*
我在这里犯了什么错误?
我通过在 table 环境中添加一个设置来解决这个问题。
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
我可以在设置超时后立即创建 windows