如何在 flink sql table 中自动生成水印?
how to auto generate watermark in flink sql table?
我正在测试 flink cep sql,我的水印被定义为行时间,我的 table 是一个 kafka table。由于水印依赖于所有kafka分区中的最小部分,因此每条新消息都必须等待kafka分区对齐,然后cep触发结果。
我的kafka table(主题有3个分区)定义为
create table test_table(
agent_id String, room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time as to_timestamp_ltz(create_time, 3),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)
这是我的cep sql
select * from test_table match_recognize (
partition by agent_id,room_id,call_type
order by row_time
measures
last(BF.create_time) as create_time,
last(AF.connect_time) as connect_time
one row per match after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR
define
BF as BF.connect_time = 0,
AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type
) as T ;
cep sql 触发结果是正确的,但总是迟到,因为每个分区都需要对齐水印。如何立即或在flink中自动生成水印sqltable?
您的模式要求找到 connect_time > 0
的行,该行紧接在 connect_time = 0
的行之后(其中两行具有相同的 room_id 和 call_type) .要完美正确地完成此模式匹配,有必要等待水印。否则,过早的匹配可能会因乱序事件的到来而失效——例如,connect_time < 0
正好在 AF 之前的事件。 (你可能知道那是不可能的,但 cep/sql 引擎不知道。)
如果您愿意放宽模式匹配语义,为什么不将此 MATCH_RECOGNIZE 查询替换为间隔连接(具有时间约束的自连接)。有关详细信息,请参阅 https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins。
BTW,这部分定义AF
... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
没有任何影响,因为流已被 room_id
和 call_type
分割。
我正在测试 flink cep sql,我的水印被定义为行时间,我的 table 是一个 kafka table。由于水印依赖于所有kafka分区中的最小部分,因此每条新消息都必须等待kafka分区对齐,然后cep触发结果。
我的kafka table(主题有3个分区)定义为
create table test_table(
agent_id String, room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time as to_timestamp_ltz(create_time, 3),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)
这是我的cep sql
select * from test_table match_recognize (
partition by agent_id,room_id,call_type
order by row_time
measures
last(BF.create_time) as create_time,
last(AF.connect_time) as connect_time
one row per match after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR
define
BF as BF.connect_time = 0,
AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type
) as T ;
cep sql 触发结果是正确的,但总是迟到,因为每个分区都需要对齐水印。如何立即或在flink中自动生成水印sqltable?
您的模式要求找到 connect_time > 0
的行,该行紧接在 connect_time = 0
的行之后(其中两行具有相同的 room_id 和 call_type) .要完美正确地完成此模式匹配,有必要等待水印。否则,过早的匹配可能会因乱序事件的到来而失效——例如,connect_time < 0
正好在 AF 之前的事件。 (你可能知道那是不可能的,但 cep/sql 引擎不知道。)
如果您愿意放宽模式匹配语义,为什么不将此 MATCH_RECOGNIZE 查询替换为间隔连接(具有时间约束的自连接)。有关详细信息,请参阅 https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins。
BTW,这部分定义AF
... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
没有任何影响,因为流已被 room_id
和 call_type
分割。