如何在基于 window 的 CEP 中触发一行代码
How to trigger a line of code in window based CEP
我有一个事件流,想统计特定时间段内的事件数以找出事件损失。
我的代码类似于以下内容:
DataStream<DataEvent> dataStream = ...;
dataStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.process(new MyProcessWindowFunction());
我将 MyProcessWindowFunction class 定义为:
public class MyProcessWindowFunction
extends ProcessAllWindowFunction<DataEvent, String, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<DataEvent, String, TimeWindow>.Context context, Iterable<DataEvent> iterable, Collector<String> collector) throws Exception {
long count = 0;
for (DataEvent dataEvent : iterable) {
count++;
}
if ()
collector.collect("Window: " + context.window() + "count: " + count);
}
}
我的问题是如何使用计数值来比较它并找到事件丢失。正如我理解的那样,这个过程函数将创建一个由收集器收集的字符串流。但是,我想在每次滑动结束时发现事件丢失后立即做一些事情window。
感谢您的帮助。
最好的问候,
听起来您想按照以下思路做一些事情:
datastream
.map(new Tuple2<>(event.sensorId, 1))
.keyBy(t -> t.f0)
.window(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.reduce(t1, t2 -> new Tuple2<>(t1.f0, t1.f1 + t2.f1))
.filter(t -> new SensorShouldBeStopped(t))
.addSink(...);
我有一个事件流,想统计特定时间段内的事件数以找出事件损失。 我的代码类似于以下内容:
DataStream<DataEvent> dataStream = ...;
dataStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.process(new MyProcessWindowFunction());
我将 MyProcessWindowFunction class 定义为:
public class MyProcessWindowFunction
extends ProcessAllWindowFunction<DataEvent, String, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<DataEvent, String, TimeWindow>.Context context, Iterable<DataEvent> iterable, Collector<String> collector) throws Exception {
long count = 0;
for (DataEvent dataEvent : iterable) {
count++;
}
if ()
collector.collect("Window: " + context.window() + "count: " + count);
}
}
我的问题是如何使用计数值来比较它并找到事件丢失。正如我理解的那样,这个过程函数将创建一个由收集器收集的字符串流。但是,我想在每次滑动结束时发现事件丢失后立即做一些事情window。
感谢您的帮助。 最好的问候,
听起来您想按照以下思路做一些事情:
datastream
.map(new Tuple2<>(event.sensorId, 1))
.keyBy(t -> t.f0)
.window(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.reduce(t1, t2 -> new Tuple2<>(t1.f0, t1.f1 + t2.f1))
.filter(t -> new SensorShouldBeStopped(t))
.addSink(...);