如何在基于 window 的 CEP 中触发一行代码

How to trigger a line of code in window based CEP

我有一个事件流,想统计特定时间段内的事件数以找出事件损失。 我的代码类似于以下内容:

DataStream<DataEvent> dataStream = ...;

dataStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.process(new MyProcessWindowFunction());

我将 MyProcessWindowFunction class 定义为:

public class MyProcessWindowFunction
        extends ProcessAllWindowFunction<DataEvent, String, TimeWindow> {

    @Override
    public void process(ProcessAllWindowFunction<DataEvent, String, TimeWindow>.Context context, Iterable<DataEvent> iterable, Collector<String> collector) throws Exception {
        long count = 0;
        for (DataEvent dataEvent : iterable) {
            count++;
        }
        if ()
        collector.collect("Window: " + context.window() + "count: " + count);
    }
}

我的问题是如何使用计数值来比较它并找到事件丢失。正如我理解的那样,这个过程函数将创建一个由收集器收集的字符串流。但是,我想在每次滑动结束时发现事件丢失后立即做一些事情window。

感谢您的帮助。 最好的问候,

听起来您想按照以下思路做一些事情:

datastream
  .map(new Tuple2<>(event.sensorId, 1))
  .keyBy(t -> t.f0)
  .window(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
  .reduce(t1, t2 -> new Tuple2<>(t1.f0, t1.f1 + t2.f1)) 
  .filter(t -> new SensorShouldBeStopped(t))
  .addSink(...);