如果在给定的时间间隔内未收到下一个事件,则超时 CEP 模式
Timeout CEP pattern if next event not received in a given interval of time
我是 Flink 的新手,我正在尝试一个 POC,如果在大于 CEP 时间段内指定时间的 x 时间内没有收到任何事件
public class MyCEPApplication {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer_group");
FlinkKafkaConsumer<String> inputSignal = new FlinkKafkaConsumer<>("consumer_topic",
new SimpleStringSchema(),
properties);
DataStream<String> inputSignalKafka = streamExecutionEnvironment.addSource(inputSignal);
DataStream<MyEvent> eventDataStream = inputSignalKafka.map(new MapFunction<String, MyEvent>() {
@Override
public MyEvent map(String value) throws Exception {
ItmAtomicEvent MyEvent = new ItmAtomicEvent();
MyEvent.setJsonObject(new JSONObject(value));
MyEvent.setAttribute("time",System.currentTimeMillis());
return MyEvent;
}
}).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<MyEvent>() {
@Override
public long extractTimestamp(MyEvent event, long currentTimestamp) {
System.out.println("TIMESTAMP: " +(long)event.getAttribute("time").get());
System.out.println("Time Difference: " +((long)(event.getAttribute("time").get()) - timeDifference));
timeDifference = (long)(event.getAttribute("time").get());
return (long)event.getAttribute("time").get();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
});
eventDataStream.print("Source=======================================>");
Pattern<MyEvent, ?> pattern =
Pattern.<MyEvent>begin("start")
.where(new SimpleCondition<MyEvent>() {
@Override
public boolean filter(MyEvent event) {
return event.equals("Event1");
}
})
.next("end")
.where(new SimpleCondition<MyEvent>() {
@Override
public boolean filter(MyEvent event) {
return event.equals("Event2");
}
}).within(Time.seconds(10));
PatternStream<MyEvent> patternStream = cepPatternMatching.compile(pattern, eventDataStream);
OutputTag<MyEvent> timedout = new OutputTag<MyEvent>("timedout") {};
SingleOutputStreamOperator<MyEvent> result = patternStream.flatSelect(
timedout,
new PatternFlatTimeoutFunction<MyEvent,MyEvent>() {
@Override
public void timeout(Map<String, List<MyEvent>> pattern, long timeoutTimestamp, Collector<MyEvent> out) throws Exception {
if(null != pattern.get("CustomerId")){
for (MyEvent timedOutEvent :pattern.get("CustomerId")){
System.out.println("TimedOut Event : "+timedOutEvent.getField(0));
out.collect(timedOutEvent);
}
}
}
},
new PatternFlatSelectFunction<MyEvent, MyEvent>() {
@Override
public void flatSelect(Map<String, List<MyEvent>> pattern,
Collector<MyEvent> out) throws Exception {
out.collect(pattern.get("CustomerId").get(0));
}
}
);
result.print("=============CEP Pattern Detected==================");
DataStream<MyEvent> timedOut = result.getSideOutput(timedout);
timedOut.print("=============CEP TimedOut Pattern Detected==================");
streamExecutionEnvironment.execute("CEPtest");
}
}
即使在其打印超时事件 10 秒后未收到任何事件,我什至尝试注释掉代码 PatternFlatSelectFunction 方法,如果在给定的 x 秒内未收到任何事件,是否有办法或解决方法使模式超时。有人问了我在下面的解决方案中提到的相同问题,但对我没有任何帮助,请帮助我解决问题
1)
您的应用程序正在使用事件时间,因此尽管没有传入事件,您仍需要安排生成足够大的 Watermark。如果你想在源空闲时人为地推进当前水印,你可以使用 this example。
鉴于您的事件没有 event-time 时间戳,您为什么不简单地使用处理时间来代替,从而避免这个问题? (但是请注意, 中提到的限制)。
我是 Flink 的新手,我正在尝试一个 POC,如果在大于 CEP 时间段内指定时间的 x 时间内没有收到任何事件
public class MyCEPApplication {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer_group");
FlinkKafkaConsumer<String> inputSignal = new FlinkKafkaConsumer<>("consumer_topic",
new SimpleStringSchema(),
properties);
DataStream<String> inputSignalKafka = streamExecutionEnvironment.addSource(inputSignal);
DataStream<MyEvent> eventDataStream = inputSignalKafka.map(new MapFunction<String, MyEvent>() {
@Override
public MyEvent map(String value) throws Exception {
ItmAtomicEvent MyEvent = new ItmAtomicEvent();
MyEvent.setJsonObject(new JSONObject(value));
MyEvent.setAttribute("time",System.currentTimeMillis());
return MyEvent;
}
}).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<MyEvent>() {
@Override
public long extractTimestamp(MyEvent event, long currentTimestamp) {
System.out.println("TIMESTAMP: " +(long)event.getAttribute("time").get());
System.out.println("Time Difference: " +((long)(event.getAttribute("time").get()) - timeDifference));
timeDifference = (long)(event.getAttribute("time").get());
return (long)event.getAttribute("time").get();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
});
eventDataStream.print("Source=======================================>");
Pattern<MyEvent, ?> pattern =
Pattern.<MyEvent>begin("start")
.where(new SimpleCondition<MyEvent>() {
@Override
public boolean filter(MyEvent event) {
return event.equals("Event1");
}
})
.next("end")
.where(new SimpleCondition<MyEvent>() {
@Override
public boolean filter(MyEvent event) {
return event.equals("Event2");
}
}).within(Time.seconds(10));
PatternStream<MyEvent> patternStream = cepPatternMatching.compile(pattern, eventDataStream);
OutputTag<MyEvent> timedout = new OutputTag<MyEvent>("timedout") {};
SingleOutputStreamOperator<MyEvent> result = patternStream.flatSelect(
timedout,
new PatternFlatTimeoutFunction<MyEvent,MyEvent>() {
@Override
public void timeout(Map<String, List<MyEvent>> pattern, long timeoutTimestamp, Collector<MyEvent> out) throws Exception {
if(null != pattern.get("CustomerId")){
for (MyEvent timedOutEvent :pattern.get("CustomerId")){
System.out.println("TimedOut Event : "+timedOutEvent.getField(0));
out.collect(timedOutEvent);
}
}
}
},
new PatternFlatSelectFunction<MyEvent, MyEvent>() {
@Override
public void flatSelect(Map<String, List<MyEvent>> pattern,
Collector<MyEvent> out) throws Exception {
out.collect(pattern.get("CustomerId").get(0));
}
}
);
result.print("=============CEP Pattern Detected==================");
DataStream<MyEvent> timedOut = result.getSideOutput(timedout);
timedOut.print("=============CEP TimedOut Pattern Detected==================");
streamExecutionEnvironment.execute("CEPtest");
}
}
即使在其打印超时事件 10 秒后未收到任何事件,我什至尝试注释掉代码 PatternFlatSelectFunction 方法,如果在给定的 x 秒内未收到任何事件,是否有办法或解决方法使模式超时。有人问了我在下面的解决方案中提到的相同问题,但对我没有任何帮助,请帮助我解决问题
1)
您的应用程序正在使用事件时间,因此尽管没有传入事件,您仍需要安排生成足够大的 Watermark。如果你想在源空闲时人为地推进当前水印,你可以使用 this example。
鉴于您的事件没有 event-time 时间戳,您为什么不简单地使用处理时间来代替,从而避免这个问题? (但是请注意,