在 Flink CEP 中是否有处理多个 "temporal constraints" 的解决方法?

Is there a work-around to handle multiple "temporal constraints" in Flink CEP?

如 CEP 文档 (https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/cep.html) 中所述,模式序列中只允许一个时间约束,我正在努力寻找一种方法来处理包含 2 个时间约束的业务案例。

我需要监控一些业务事件,并对满足以下规则的事件发出警报:

  1. 注册了一个新账号
  2. 注册后5分钟内完成账号认证
  3. 账户在接下来的1小时内至少完成了2笔交易金额大于1000.00的交易。

代码是这样的:

Pattern<Event, ?> pattern = Pattern.<Event>begin("register").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.REGISTER);
    }
}).followedBy("authentication").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.AUTHENTICATION);
    }
}).where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        for (Event event : ctx.getEventsForPattern("register")) {
            if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 5) {
                return true;
            }
        }
        return false;
    }
}).followedBy("transaction").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.TRANSACTION && value.getAmount() > 1000.00);
    }
}).where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        for (Event event : ctx.getEventsForPattern("authentication")) {
            if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 60) {
                return true;
            }
        }
        return false;
    }
}).timesOrMore(2);

您可以看到我使用了 2 个 IterativeConditions 来处理时间约束。有没有更好的方法让代码更简洁?

正如您所说,您现在只能对 CEP 库中的整个模式应用一个时间约束。你可以做的是将你的模式分成 2 个子模式。首先应用将查找 REGISTER -> AUTHENTICATE 并从中生成复杂事件的模式(我们将其命名为 REGISTER_AUTHENTICATED)。然后在后续模式中使用它 REGISTER_AUTHENTICATED -> 2* TRANSACTIONS.

然后您可以对这两种模式应用两个时间限制。