Flink SQL 查询不返回结果

Flink SQL Query not returning results

我在 flink 作业中使用了以下 SQL 查询。 mysql_table 是使用 JDBC 连接器创建的,kafa_source table 是从传入的 kafka 流创建的。

SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)

我正在两者之间执行临时连接,当我检查 Flink 的 sql-client CLI(使用 flink-faker 测试)时,这 运行 很好。内部查询工作得很好并且正在打印结果。谁能帮我确定查询的问题?

编辑: 我正在寻找像这样超过 5 分钟创建的 TUMBLE 事件的输出

+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)

mysql table 的架构是

+-----------------+--------------+
| Field           | Type         |
+-----------------+--------------+
| event_id        | varchar(64)  |
| event_source_id | varchar(255) |
| event_name      | varchar(255) |

和kafka table的方案是

event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE

编辑: 我观察到 TUMBLE_ENDPROCTIME() 列一起工作正常,但同样不适用于 event_time。这是分别选择 proc_timeevent_time 的查询的输出。

+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]

下面的查询是基于 kafka_source table.

SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)

并给出以下类似的输出

+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]

当我使用 event_time 代替 proc_time 时,相同的查询没有返回任何结果。我在 table 中创建这些列是这样的:

Schema.newBuilder()
    .columnByExpression("proc_time", "PROCTIME()")
    .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
    .watermark("event_time", "event_time - INTERVAL '20' SECOND")
    .build()

其中 eventTime 只是来自 kafka 主题的传入时间戳值。两个字段的类型相同,TIMESTAMP_LTZ(3)

proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*

我在这里犯了什么错误?

我通过在 table 环境中添加一个设置来解决这个问题。

tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")

我可以在设置超时后立即创建 windows