看起来像是 CEP 示例的 flink-training-exercises 中的错误

Looks like a bug in flink-training-exercises for the CEP example

我在下面得到了 CEP 的例子 URL https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/cep/LongRides.java

还有 "goal for this exercise is to emit START events for taxi rides that have not been matched by an END event during the first 2 hours of the ride." 然而,从下面的代码来看,似乎找到了一种模式,可以找到在 2 小时内完成的游乐设施,而不是 NOT 在 2 小时内完成的。

貌似是先找Start事件,再找End Event(!ride.isStart),不到2小时,不就说明找游乐设施已经完成了2 小时后?

Pattern<TaxiRide, TaxiRide> completedRides =
            Pattern.<TaxiRide>begin("start")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return ride.isStart;
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return !ride.isStart;
                        }
                    });

    // We want to find rides that have NOT been completed within 120 minutes
    PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));

我改进了示例解决方案中的注释以使其更加清晰。

// We want to find rides that have NOT been completed within 120 minutes.
// This pattern matches rides that ARE completed.
// Below we will ignore rides that match this pattern, and emit those that timeout.
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));

OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};

SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
        timedout,
        new TaxiRideTimedOut<TaxiRide>(),
        new FlatSelectNothing<TaxiRide>()
);