如何停止 Apache Flink CEP 模式?

How to stop Apache Flink CEP Pattern?

请帮助我,我有两个问题:
我从 Apache Kafka 读取 json-messages,(然后我有步骤:反序列化到 POJO、过滤器、keyBy ....)

  1. 哪个更好用:KeyedProcessFunction(带有状态、定时器、if-else 逻辑块)或 Flink CEP 模式库?

我可以在 KeyedProcessFunction 中检查输入序列(检查状态、if-else 块、out.collect(...)、state.clear()。 ..你会理解我的),以及我可以使用 Flink CEP 库 条件和量化器。

  1. 如何停止flink CEP模式?

例如:
我有输入序列:A1,(无事件 1 分钟)A2,(无事件 5 分钟)-3,(无事件 1 分钟)-4,(无事件超过 5 分钟)A5。 (A1和A5之间可能有很多事件)
我要发送 output:A1、A3、A5。
第一个事件,如果下一个事件在上一个事件后不到 5 分钟内到达,它将不会发送到输出,如果下一个事件在上一个事件后超过 5 分钟内到达,它将发送到输出。
我应该在我的模式中添加什么???

Pattern<Event, ?> pattern = Pattern.
<Event>begin("start")
.where(new SimpleCondition<Event>(){
 public boolean filter(Event event){
return event.getName().contains("A");
}
}).within(Time.minutes(5));

虽然乍一看这个特定示例作为 KeyedProcessFunction 实现起来似乎相当微不足道,但如果消息可能乱序到达,肯定会产生一些复杂性。那么你可能会误以为会有很大的差距,而实际上并没有。

但是,如果您想要一个简单的 out-of-the-box、ready-made 解决方案,这个特定示例非常适合会话 windows。

对于 CEP,我认为一个可行的解决方案应该是这样的:您正在寻找一个 A(称为 A1)紧接着另一个 A(称为 A2)的序列,其中 (A2.timestamp - A1.timestamp) >= 5 分钟。找到匹配项后,发出 A1 并推进匹配引擎,使 A2 成为新的 A1。 (方便的是,CEP pre-sorts 输入流,因此您不必担心 out-of-order。)