Flink 时态连接不显示数据
Flink temporal join not showing data
我正在尝试在 Flink docs 上复制时间连接示例,但是没有显示任何结果。也没有错误。
我的table:
CREATE TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
插入
INSERT into currency_rates
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into currency_rates
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));
加入查询:
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;
连接查询中没有显示任何结果,我在那里找不到任何可用的示例。
我缺少什么才能让这个临时连接起作用?
问题与水印有关。临时连接不会生成 updating/retraction 流,因此它必须等待证据表明 currency_rates 流在第一个事务的时间完成,然后才能为该事务生成最终结果。 (以此类推,用于后续交易。)
如果你添加
INSERT into currency_rates
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));
这应该足以清除一些结果。
如果这不能解决问题,那么 per-partition watermarking that some sources (including Kafka) use could be the problem. You can address this by either ensuring that every Kafka partition has some data, or by setting the table-exec-source-idle-timeout 配置参数使空闲分区不会无限期地阻止水印。
对于我自己,我还必须将此 属性 添加到交易 table:
'scan.startup.mode' = 'earliest-offset',
如果不对 table DDL 进行此更改,我会遇到错误。 (有关这方面的更多信息,请参阅 。)
这些是我从时间连接中得到的结果:
id eur_rate total_eur total currency_code transaction_time
001 0.0139 0.126490 9.10 EURO 2022-01-12 12:50:00.000
003 0.0139 0.168468 12.12 EURO 2022-01-12 12:52:10.000
002 0.0310 0.161200 5.20 CAD 2022-01-12 12:51:10.000
我正在尝试在 Flink docs 上复制时间连接示例,但是没有显示任何结果。也没有错误。
我的table:
CREATE TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
插入
INSERT into currency_rates
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into currency_rates
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));
加入查询:
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;
连接查询中没有显示任何结果,我在那里找不到任何可用的示例。
我缺少什么才能让这个临时连接起作用?
问题与水印有关。临时连接不会生成 updating/retraction 流,因此它必须等待证据表明 currency_rates 流在第一个事务的时间完成,然后才能为该事务生成最终结果。 (以此类推,用于后续交易。)
如果你添加
INSERT into currency_rates
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));
这应该足以清除一些结果。
如果这不能解决问题,那么 per-partition watermarking that some sources (including Kafka) use could be the problem. You can address this by either ensuring that every Kafka partition has some data, or by setting the table-exec-source-idle-timeout 配置参数使空闲分区不会无限期地阻止水印。
对于我自己,我还必须将此 属性 添加到交易 table:
'scan.startup.mode' = 'earliest-offset',
如果不对 table DDL 进行此更改,我会遇到错误。 (有关这方面的更多信息,请参阅 。)
这些是我从时间连接中得到的结果:
id eur_rate total_eur total currency_code transaction_time
001 0.0139 0.126490 9.10 EURO 2022-01-12 12:50:00.000
003 0.0139 0.168468 12.12 EURO 2022-01-12 12:52:10.000
002 0.0310 0.161200 5.20 CAD 2022-01-12 12:51:10.000