flink cep sql 事件未触发

flink cep sql Event Not triggering

我在 Flink SQL 中使用 CEP 模式,它按预期工作连接到 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。这是我的 sql:

create table agent_action_detail 
(
    agent_id String, 
    room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time  - INTERVAL '1' MINUTE) 
with ('connector'='kafka', 'topic'='agent-action-detail', ...)

然后我以 json 格式发送消息,例如

{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}

在 flink web ui 中,水印效果很好 flink web ui

我运行我的cepsql:

select * from agent_action_detail
 match_recognize(
    partition by agent_id 
    order by row_time 
    measures 
        last(BF.create_time) as create_time, 
        first(AF.connect_time) as connect_time 
    one row per match AFTER MATCH SKIP PAST LAST ROW 
    pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
 )

每条 kafka 消息,connect_time > 0,但 flink 未触发。 有人可以帮助解决这个问题,在此先感谢!


select * from agent_action_detail match_recognize( partition by agent_id order by row_time  measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)

这是另一个 cep sql 仍然无法正常工作。 agent_action_detail table 被另一个 flink sql 插入为

insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'

有几种情况会导致模式匹配不产生结果:

  • 输入实际上不包含模式
  • 水印处理不正确
  • 这种模式在某种程度上是病态的

这个特殊的模式循环,没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。

如果你直接使用Flink CEP,我会告诉你 尝试添加 until(condition)within(time) 来限制可能匹配的数量。

使用 MATCH_RECOGNIZE,看看您是否可以向模式中添加一个不同的终止元素。


更新:由于修改模式后仍然没有结果,您应该确定水印是否是问题的根源。 CEP 依赖于按时间对输入流进行排序,这取决于水印——但前提是您使用的是事件时间。

最简单的测试方法是切换到使用处理时间:

create table agent_action_detail 
(
    agent_id String, 
    ...
    row_time AS PROCTIME()
)
with (...)

如果可行,那么时间戳或水印就是问题所在。例如,如果所有事件都延迟,您将得不到任何结果。对于您的情况,我想知道 row_time 列中是否有任何数据。


如果这没有揭示问题,请分享一个最小的可重现示例,包括观察问题所需的数据。