基于第三个事件链接 Flink 中的两个事件
Linking two events in Flink based on a third event
如果两个事件流入 Flink,是否可以使用它们之后的第三个事件中的信息(使用 DataStream API 或 CEP)将它们逻辑连接起来?例如,下例中的第三个事件是否可以根据其 right_id 和 left_id 用于 link 前两个事件?
ID: AAAA
ID: BBBB
ID: ZZZZ, right_id: AAAA, left_id: BBBB
这是一个非常基本的 CEP 用例。代码看起来像这样...
// data stream creation
DataStream<Event> myStream = ...
// cep pattern definition
Pattern<Event, ?> myPattern = Pattern.<Event>begin("first_event")
.followedBy("second_event")
.followedBy("match_event");
// cep pattern stream: apply pattern to stream
PatternStream<Event> myPatternStream = CEP.pattern(myStream, myPattern);
// create new data stream from pattern matches
DataStream<CEPEvent> myCEPEvent = myPatternStream.flatSelect(
(Map<String, Event> pattern, Collector<CEPEvent> out) -> {
// load potential event sequence matches
Event first_event = pattern.get("first_event");
Event second_event = pattern.get("second_event");
Event match_event = pattern.get("match_event");
// test event sequences
if (match_event.right_id.equals(first_event.ID)
&& match_event.left_id.equals(second_event.ID)
){out.collect(new CEPEvent("successful cep hit"));}
}
);
如果两个事件流入 Flink,是否可以使用它们之后的第三个事件中的信息(使用 DataStream API 或 CEP)将它们逻辑连接起来?例如,下例中的第三个事件是否可以根据其 right_id 和 left_id 用于 link 前两个事件?
ID: AAAA
ID: BBBB
ID: ZZZZ, right_id: AAAA, left_id: BBBB
这是一个非常基本的 CEP 用例。代码看起来像这样...
// data stream creation
DataStream<Event> myStream = ...
// cep pattern definition
Pattern<Event, ?> myPattern = Pattern.<Event>begin("first_event")
.followedBy("second_event")
.followedBy("match_event");
// cep pattern stream: apply pattern to stream
PatternStream<Event> myPatternStream = CEP.pattern(myStream, myPattern);
// create new data stream from pattern matches
DataStream<CEPEvent> myCEPEvent = myPatternStream.flatSelect(
(Map<String, Event> pattern, Collector<CEPEvent> out) -> {
// load potential event sequence matches
Event first_event = pattern.get("first_event");
Event second_event = pattern.get("second_event");
Event match_event = pattern.get("match_event");
// test event sequences
if (match_event.right_id.equals(first_event.ID)
&& match_event.left_id.equals(second_event.ID)
){out.collect(new CEPEvent("successful cep hit"));}
}
);