在 Flink CEP 中是否有处理多个 "temporal constraints" 的解决方法?
Is there a work-around to handle multiple "temporal constraints" in Flink CEP?
如 CEP 文档 (https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/cep.html) 中所述,模式序列中只允许一个时间约束,我正在努力寻找一种方法来处理包含 2 个时间约束的业务案例。
我需要监控一些业务事件,并对满足以下规则的事件发出警报:
- 注册了一个新账号
- 注册后5分钟内完成账号认证
- 账户在接下来的1小时内至少完成了2笔交易金额大于1000.00的交易。
代码是这样的:
Pattern<Event, ?> pattern = Pattern.<Event>begin("register").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.REGISTER);
}
}).followedBy("authentication").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.AUTHENTICATION);
}
}).where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
for (Event event : ctx.getEventsForPattern("register")) {
if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 5) {
return true;
}
}
return false;
}
}).followedBy("transaction").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.TRANSACTION && value.getAmount() > 1000.00);
}
}).where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
for (Event event : ctx.getEventsForPattern("authentication")) {
if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 60) {
return true;
}
}
return false;
}
}).timesOrMore(2);
您可以看到我使用了 2 个 IterativeConditions 来处理时间约束。有没有更好的方法让代码更简洁?
正如您所说,您现在只能对 CEP 库中的整个模式应用一个时间约束。你可以做的是将你的模式分成 2 个子模式。首先应用将查找 REGISTER -> AUTHENTICATE 并从中生成复杂事件的模式(我们将其命名为 REGISTER_AUTHENTICATED)。然后在后续模式中使用它 REGISTER_AUTHENTICATED -> 2* TRANSACTIONS.
然后您可以对这两种模式应用两个时间限制。
如 CEP 文档 (https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/cep.html) 中所述,模式序列中只允许一个时间约束,我正在努力寻找一种方法来处理包含 2 个时间约束的业务案例。
我需要监控一些业务事件,并对满足以下规则的事件发出警报:
- 注册了一个新账号
- 注册后5分钟内完成账号认证
- 账户在接下来的1小时内至少完成了2笔交易金额大于1000.00的交易。
代码是这样的:
Pattern<Event, ?> pattern = Pattern.<Event>begin("register").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.REGISTER);
}
}).followedBy("authentication").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.AUTHENTICATION);
}
}).where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
for (Event event : ctx.getEventsForPattern("register")) {
if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 5) {
return true;
}
}
return false;
}
}).followedBy("transaction").where(new SimpleCondition<Event>() {
@Override
public boolean filter<Event value> throws Exception {
return (value.getEventType() == EventType.TRANSACTION && value.getAmount() > 1000.00);
}
}).where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
for (Event event : ctx.getEventsForPattern("authentication")) {
if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 60) {
return true;
}
}
return false;
}
}).timesOrMore(2);
您可以看到我使用了 2 个 IterativeConditions 来处理时间约束。有没有更好的方法让代码更简洁?
正如您所说,您现在只能对 CEP 库中的整个模式应用一个时间约束。你可以做的是将你的模式分成 2 个子模式。首先应用将查找 REGISTER -> AUTHENTICATE 并从中生成复杂事件的模式(我们将其命名为 REGISTER_AUTHENTICATED)。然后在后续模式中使用它 REGISTER_AUTHENTICATED -> 2* TRANSACTIONS.
然后您可以对这两种模式应用两个时间限制。