Apache Flink CEP 中没有事件

Absence of event in Apache Flink CEP

我是 Apache Flink CEP 的新手,我正在努力检测简单的事件缺失。

我要检测的是具有特定 ID CurrencyEvent 类型的事件是否在特定时间内未发生 .我想在每次 3000 毫秒后事件未发生时检测是否存在此类事件

我的模式代码如下所示:

Pattern<CurrencyEvent, ?> myPattern = Pattern.<Event>begin("CurrencyEvent")
        .subtype(CurrencyEvent.class)
        .where(new SimpleCondition<CurrencyEvent>() {
            @Override
            public boolean filter(CurrencyEvent currencyEvent) throws Exception {
                return currencyEvent.getId().equalsIgnoreCase("usd");
            }
        })
        .within(Time.milliseconds(3000L));

所以现在我的想法是使用超时函数来检测超时事件:

    DataStreamSource<Event> events = env.addSource(new TestSource(
            Arrays.asList(
                    basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D),
                    basicCurrencyWithMivLevelEvent("USD", 100L, Arrays.asList("1", "2"), 200D),
                    basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D)
            ),
            1636040364820L, // initial timestamp for the first element
            7000 // 7 seconds between each event
    ));

  PatternStream<Event> patternStream = CEP.pattern(
            events,
            (Pattern<Event, ?>) myPattern
    );

    OutputTag<Alarm> tag = new OutputTag<Alarm>("currency-timeout"){};

    PatternFlatTimeoutFunction<Event, Alarm> eventAlarmTimeoutPatternFunction = (patterns, timestamp, ctx) -> {
        System.out.println("New alarm, since after 3 seconds an event with id=usd is not detected");
        //TODO: call collect

    };


    PatternFlatSelectFunction<Event, Alarm> eventAlarmPatternSelectFunction = (patterns, ctx) -> {
        System.out.println("Select! (we can ignore it) " + patterns);
        // ignore matched events
    };

    return patternStream.flatSelect(
            tag,
            eventAlarmTimeoutPatternFunction,
            TypeInformation.of(Alarm.class),
            eventAlarmPatternSelectFunction
    );

我的测试源使用的是事件时间戳和水印,如下图:

public class TestSource implements SourceFunction<Event> {

    private final List<Event> events;
    private final long initialTimestamp;
    private final long timeBetweenInMillis;

    public TestSource(List<Event> events, long initialTimestamp, long timeBetweenInMillis){
        this.events = events;
        this.initialTimestamp = initialTimestamp;
        this.timeBetweenInMillis = timeBetweenInMillis;


    }
    @Override
    public void run(SourceContext<Event> sourceContext) throws InterruptedException {
        long timestamp  = this.initialTimestamp;
        for(Event event: this.events){
            sourceContext.collectWithTimestamp(event, timestamp);
            sourceContext.emitWatermark(new Watermark(timestamp));
            timestamp+=this.timeBetweenInMillis;
        }
    }

    @Override
    public void cancel() {

    }

}

我正在使用 TimeCharacteristics.EventTime

由于 window 时间(3 秒)低于每个事件之间的事件时间差(7 秒),我希望得到一些超时事件,但我得到 0.

CEP Pattern 匹配一个或多个事件的序列; within(interval) 子句添加了一个额外的约束,即序列中的所有事件都必须在指定的 间隔 内发生。当部分匹配超时时,这可以在 TimedOutPartialMatchHandler.

中捕获

在您的情况下,由于成功匹配的模式由单个事件组成,因此不会有部分匹配,并且匹配永远不会超时。 (您的匹配序列总是少于 3 秒。)

您可以做的是扩展模式定义以包含第二个事件,以便在 3 秒内必须有一个开始事件后跟另一个事件才能匹配。当第二个事件丢失时,您将有一个超时的部分匹配。

为了比 CEP 提供的实施涉及丢失事件的用例更灵活,您可以使用带计时器的 KeyedProcessFunction