Flink 时态连接不显示数据

Flink temporal join not showing data

我正在尝试在 Flink docs 上复制时间连接示例,但是没有显示任何结果。也没有错误。

我的table:

CREATE TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '<my-server>',
  'key.format' = 'raw',
  'value.format' = 'json'
);
CREATE TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '<my-server>',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

插入

INSERT into currency_rates 
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into currency_rates 
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));

加入查询:

SELECT 
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total, 
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

连接查询中没有显示任何结果,我在那里找不到任何可用的示例。

我缺少什么才能让这个临时连接起作用?

问题与水印有关。临时连接不会生成 updating/retraction 流,因此它必须等待证据表明 currency_rates 流在第一个事务的时间完成,然后才能为该事务生成最终结果。 (以此类推,用于后续交易。)

如果你添加

INSERT into currency_rates 
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));

这应该足以清除一些结果。

如果这不能解决问题,那么 per-partition watermarking that some sources (including Kafka) use could be the problem. You can address this by either ensuring that every Kafka partition has some data, or by setting the table-exec-source-idle-timeout 配置参数使空闲分区不会​​无限期地阻止水印。

对于我自己,我还必须将此 属性 添加到交易 table:

'scan.startup.mode' = 'earliest-offset',

如果不对 table DDL 进行此更改,我会遇到错误。 (有关这方面的更多信息,请参阅 。)

这些是我从时间连接中得到的结果:

id    eur_rate   total_eur    total   currency_code   transaction_time
001   0.0139     0.126490      9.10   EURO            2022-01-12 12:50:00.000
003   0.0139     0.168468     12.12   EURO            2022-01-12 12:52:10.000
002   0.0310     0.161200      5.20   CAD             2022-01-12 12:51:10.000