在 Flink 中加入连续查询 SQL

Joining continuous queries in Flink SQL

我正在尝试连接两个连续查询,但 运行 出现以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.\nPlease check the documentation for the set of currently supported SQL features.

table 定义如下:

CREATE TABLE `Combined` (
    `machineID` STRING,
    `cycleID` BIGINT,
    `start` TIMESTAMP(3),
    `end` TIMESTAMP(3),
    WATERMARK FOR `end` AS `end` - INTERVAL '5' SECOND,
    `sensor1` FLOAT,
    `sensor2` FLOAT
)

和插入查询

INSERT INTO `Combined` 
SELECT
    a.`MachineID`,
    a.`cycleID`,
    MAX(a.`start`) `start`,
    MAX(a.`end`) `end`,
    MAX(a.`sensor1`) `sensor1`,
    MAX(m.`sensor2`) `sensor2`
FROM `Aggregated` a, `MachineStatus` m
WHERE 
    a.`MachineID` = m.`MachineID` AND 
    a.`cycleID` = m.`cycleID` AND 
    a.`start` = m.`timestamp`
GROUP BY a.`MachineID`, a.`cycleID`, SESSION(a.`start`, INTERVAL '1' SECOND)

来源table中的AggregatedMachineStatus中,starttimestamp列是带有水印的时间属性。

我试过将连接的输入行转换为时间戳,但这并没有解决问题,这意味着我不能使用 SESSION,这应该确保只有一个数据点每个周期都有记录。

非常感谢任何帮助!

我进一步调查了一下,发现 GROUP BY 语句在那种情况下没有意义。

此外,SESSION 可以用时间 window 代替,这是更惯用的方法。

INSERT INTO `Combined` 
SELECT
    a.`MachineID`,
    a.`cycleID`,
    a.`start`,
    a.`end`,
    a.`sensor1`,
    m.`sensor2`
FROM `Aggregated` a, `MachineStatus` m
WHERE 
    a.`MachineID` = m.`MachineID` AND 
    a.`cycleID` = m.`cycleID` AND 
    m.`timestamp` BETWEEN a.`start` AND a.`start` + INTERVAL '0' SECOND

为了了解连接动态表的不同方式,我发现 Ververica SQL training 非常有帮助。