flink cep sql 事件未触发
flink cep sql Event Not triggering
我在 Flink SQL 中使用 CEP 模式,它按预期工作连接到 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。这是我的 sql:
create table agent_action_detail
(
agent_id String,
room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time - INTERVAL '1' MINUTE)
with ('connector'='kafka', 'topic'='agent-action-detail', ...)
然后我以 json 格式发送消息,例如
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}
在 flink web ui 中,水印效果很好
flink web ui
我运行我的cepsql:
select * from agent_action_detail
match_recognize(
partition by agent_id
order by row_time
measures
last(BF.create_time) as create_time,
first(AF.connect_time) as connect_time
one row per match AFTER MATCH SKIP PAST LAST ROW
pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
)
每条 kafka 消息,connect_time > 0,但 flink 未触发。
有人可以帮助解决这个问题,在此先感谢!
select * from agent_action_detail match_recognize( partition by agent_id order by row_time measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)
这是另一个 cep sql 仍然无法正常工作。
agent_action_detail table 被另一个 flink sql 插入为
insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'
有几种情况会导致模式匹配不产生结果:
- 输入实际上不包含模式
- 水印处理不正确
- 这种模式在某种程度上是病态的
这个特殊的模式循环,没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。
如果你直接使用Flink CEP,我会告诉你
尝试添加 until(condition)
或 within(time)
来限制可能匹配的数量。
使用 MATCH_RECOGNIZE
,看看您是否可以向模式中添加一个不同的终止元素。
更新:由于修改模式后仍然没有结果,您应该确定水印是否是问题的根源。 CEP 依赖于按时间对输入流进行排序,这取决于水印——但前提是您使用的是事件时间。
最简单的测试方法是切换到使用处理时间:
create table agent_action_detail
(
agent_id String,
...
row_time AS PROCTIME()
)
with (...)
如果可行,那么时间戳或水印就是问题所在。例如,如果所有事件都延迟,您将得不到任何结果。对于您的情况,我想知道 row_time 列中是否有任何数据。
如果这没有揭示问题,请分享一个最小的可重现示例,包括观察问题所需的数据。
我在 Flink SQL 中使用 CEP 模式,它按预期工作连接到 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。这是我的 sql:
create table agent_action_detail
(
agent_id String,
room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time - INTERVAL '1' MINUTE)
with ('connector'='kafka', 'topic'='agent-action-detail', ...)
然后我以 json 格式发送消息,例如
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}
在 flink web ui 中,水印效果很好 flink web ui
我运行我的cepsql:
select * from agent_action_detail
match_recognize(
partition by agent_id
order by row_time
measures
last(BF.create_time) as create_time,
first(AF.connect_time) as connect_time
one row per match AFTER MATCH SKIP PAST LAST ROW
pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
)
每条 kafka 消息,connect_time > 0,但 flink 未触发。 有人可以帮助解决这个问题,在此先感谢!
select * from agent_action_detail match_recognize( partition by agent_id order by row_time measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)
这是另一个 cep sql 仍然无法正常工作。 agent_action_detail table 被另一个 flink sql 插入为
insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'
有几种情况会导致模式匹配不产生结果:
- 输入实际上不包含模式
- 水印处理不正确
- 这种模式在某种程度上是病态的
这个特殊的模式循环,没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。
如果你直接使用Flink CEP,我会告诉你
尝试添加 until(condition)
或 within(time)
来限制可能匹配的数量。
使用 MATCH_RECOGNIZE
,看看您是否可以向模式中添加一个不同的终止元素。
更新:由于修改模式后仍然没有结果,您应该确定水印是否是问题的根源。 CEP 依赖于按时间对输入流进行排序,这取决于水印——但前提是您使用的是事件时间。
最简单的测试方法是切换到使用处理时间:
create table agent_action_detail
(
agent_id String,
...
row_time AS PROCTIME()
)
with (...)
如果可行,那么时间戳或水印就是问题所在。例如,如果所有事件都延迟,您将得不到任何结果。对于您的情况,我想知道 row_time 列中是否有任何数据。
如果这没有揭示问题,请分享一个最小的可重现示例,包括观察问题所需的数据。