是否可以在 apache flink CEP 中处理多个流?
Is it possible to process multiple streams in apache flink CEP?
我的问题是,如果我们有两个原始事件流,即 Smoke 和 Temperature,我们想知道复杂事件是否即 Fire 是通过将运算符应用于原始流而发生的,我们可以在 Flink 中做到这一点吗?
我问这个问题是因为我目前看到的所有Flink CEP的例子都只有一个输入流。如有不妥请指正
简短回答 - 是的,您可以根据来自不同流源的事件类型读取和处理多个流和触发规则。
长答案 - 我有一个有点类似的要求,我的回答是基于你正在阅读来自不同 kafka 主题的不同流的假设。
阅读不同的主题,这些主题在单一来源中流式传输不同的事件:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
序列化程序读取数据并将它们解析为具有通用格式 - 例如。
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
然后就很简单了,根据事件名定义规则,对比事件名定义规则(也可以定义复杂的规则如下):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
我希望这能让您想到将一个或多个不同的流整合在一起。
我想知道是否可以完成严格链接(而不是 followedBy,如果可以使用 next),因为在给定的流中,特定时间戳可能有很多事件。所以说时间 t1-:a,b,c - 这三个事件来了,时间 t2-:a2,b2,c2 来到 flink 引擎。所以,我想知道我们如何获得 event(a).next(a2),因为它可能永远不会是这种情况,因为系列会是这样的 - :
一种
b
C
a2
b2
c2
但是,如果 CEP 模块处理事件时将一个时间戳视为单个事件,那么这是有道理的。
我的问题是,如果我们有两个原始事件流,即 Smoke 和 Temperature,我们想知道复杂事件是否即 Fire 是通过将运算符应用于原始流而发生的,我们可以在 Flink 中做到这一点吗?
我问这个问题是因为我目前看到的所有Flink CEP的例子都只有一个输入流。如有不妥请指正
简短回答 - 是的,您可以根据来自不同流源的事件类型读取和处理多个流和触发规则。
长答案 - 我有一个有点类似的要求,我的回答是基于你正在阅读来自不同 kafka 主题的不同流的假设。
阅读不同的主题,这些主题在单一来源中流式传输不同的事件:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
序列化程序读取数据并将它们解析为具有通用格式 - 例如。
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
然后就很简单了,根据事件名定义规则,对比事件名定义规则(也可以定义复杂的规则如下):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
我希望这能让您想到将一个或多个不同的流整合在一起。
我想知道是否可以完成严格链接(而不是 followedBy,如果可以使用 next),因为在给定的流中,特定时间戳可能有很多事件。所以说时间 t1-:a,b,c - 这三个事件来了,时间 t2-:a2,b2,c2 来到 flink 引擎。所以,我想知道我们如何获得 event(a).next(a2),因为它可能永远不会是这种情况,因为系列会是这样的 - : 一种 b C a2 b2 c2
但是,如果 CEP 模块处理事件时将一个时间戳视为单个事件,那么这是有道理的。