如果在给定的时间间隔内未收到下一个事件,则超时 CEP 模式

Timeout CEP pattern if next event not received in a given interval of time

我是 Flink 的新手,我正在尝试一个 POC,如果在大于 CEP 时间段内指定时间的 x 时间内没有收到任何事件

public class MyCEPApplication {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer_group");

        FlinkKafkaConsumer<String> inputSignal = new FlinkKafkaConsumer<>("consumer_topic",
                new SimpleStringSchema(),
                properties);

        DataStream<String> inputSignalKafka = streamExecutionEnvironment.addSource(inputSignal);


        DataStream<MyEvent> eventDataStream = inputSignalKafka.map(new MapFunction<String, MyEvent>() {
            @Override
            public MyEvent map(String value) throws Exception {
                ItmAtomicEvent MyEvent = new ItmAtomicEvent();
                MyEvent.setJsonObject(new JSONObject(value));
                MyEvent.setAttribute("time",System.currentTimeMillis());
                return MyEvent;
            }
        }).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<MyEvent>() {
            @Override
            public long extractTimestamp(MyEvent event, long currentTimestamp) {
                System.out.println("TIMESTAMP: " +(long)event.getAttribute("time").get());
                System.out.println("Time Difference: " +((long)(event.getAttribute("time").get()) - timeDifference));
                timeDifference = (long)(event.getAttribute("time").get());
                return (long)event.getAttribute("time").get();
            }
            @Override
            public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
                return new Watermark(extractedTimestamp);
            }
        });


        eventDataStream.print("Source=======================================>");
        
        Pattern<MyEvent, ?> pattern =
                Pattern.<MyEvent>begin("start")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event1");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event2");
                        }
                    }).within(Time.seconds(10));


            PatternStream<MyEvent> patternStream = cepPatternMatching.compile(pattern, eventDataStream);

            OutputTag<MyEvent> timedout = new OutputTag<MyEvent>("timedout") {};
            


            SingleOutputStreamOperator<MyEvent> result = patternStream.flatSelect(
                    timedout,
                    new PatternFlatTimeoutFunction<MyEvent,MyEvent>() {
                        @Override
                        public void timeout(Map<String, List<MyEvent>> pattern, long timeoutTimestamp, Collector<MyEvent> out) throws Exception {
                            if(null != pattern.get("CustomerId")){
                                for (MyEvent timedOutEvent :pattern.get("CustomerId")){
                                    System.out.println("TimedOut Event : "+timedOutEvent.getField(0));
                                    out.collect(timedOutEvent);
                                }
                            }
                        }
                    },
                    new PatternFlatSelectFunction<MyEvent, MyEvent>() {
                        @Override
                        public void flatSelect(Map<String, List<MyEvent>> pattern,
                                               Collector<MyEvent> out) throws Exception {

                            out.collect(pattern.get("CustomerId").get(0));
                        }
                    }
            );


            result.print("=============CEP Pattern Detected==================");


            DataStream<MyEvent> timedOut = result.getSideOutput(timedout);
           
            timedOut.print("=============CEP TimedOut Pattern Detected==================");


        streamExecutionEnvironment.execute("CEPtest");

    }
}

即使在其打印超时事件 10 秒后未收到任何事件,我什至尝试注释掉代码 PatternFlatSelectFunction 方法,如果在给定的 x 秒内未收到任何事件,是否有办法或解决方法使模式超时。有人问了我在下面的解决方案中提到的相同问题,但对我没有任何帮助,请帮助我解决问题

1)

2)https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html#handling-timed-out-partial-patterns

3)https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java

您的应用程序正在使用事件时间,因此尽管没有传入事件,您仍需要安排生成足够大的 Watermark。如果你想在源空闲时人为地推进当前水印,你可以使用 this example

鉴于您的事件没有 event-time 时间戳,您为什么不简单地使用处理时间来代替,从而避免这个问题? (但是请注意, 中提到的限制)。