Apache Flink CEP 中没有事件
Absence of event in Apache Flink CEP
我是 Apache Flink CEP 的新手,我正在努力检测简单的事件缺失。
我要检测的是具有特定 ID 的 CurrencyEvent 类型的事件是否在特定时间内未发生 .我想在每次 3000 毫秒后事件未发生时检测是否存在此类事件。
我的模式代码如下所示:
Pattern<CurrencyEvent, ?> myPattern = Pattern.<Event>begin("CurrencyEvent")
.subtype(CurrencyEvent.class)
.where(new SimpleCondition<CurrencyEvent>() {
@Override
public boolean filter(CurrencyEvent currencyEvent) throws Exception {
return currencyEvent.getId().equalsIgnoreCase("usd");
}
})
.within(Time.milliseconds(3000L));
所以现在我的想法是使用超时函数来检测超时事件:
DataStreamSource<Event> events = env.addSource(new TestSource(
Arrays.asList(
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("USD", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D)
),
1636040364820L, // initial timestamp for the first element
7000 // 7 seconds between each event
));
PatternStream<Event> patternStream = CEP.pattern(
events,
(Pattern<Event, ?>) myPattern
);
OutputTag<Alarm> tag = new OutputTag<Alarm>("currency-timeout"){};
PatternFlatTimeoutFunction<Event, Alarm> eventAlarmTimeoutPatternFunction = (patterns, timestamp, ctx) -> {
System.out.println("New alarm, since after 3 seconds an event with id=usd is not detected");
//TODO: call collect
};
PatternFlatSelectFunction<Event, Alarm> eventAlarmPatternSelectFunction = (patterns, ctx) -> {
System.out.println("Select! (we can ignore it) " + patterns);
// ignore matched events
};
return patternStream.flatSelect(
tag,
eventAlarmTimeoutPatternFunction,
TypeInformation.of(Alarm.class),
eventAlarmPatternSelectFunction
);
我的测试源使用的是事件时间戳和水印,如下图:
public class TestSource implements SourceFunction<Event> {
private final List<Event> events;
private final long initialTimestamp;
private final long timeBetweenInMillis;
public TestSource(List<Event> events, long initialTimestamp, long timeBetweenInMillis){
this.events = events;
this.initialTimestamp = initialTimestamp;
this.timeBetweenInMillis = timeBetweenInMillis;
}
@Override
public void run(SourceContext<Event> sourceContext) throws InterruptedException {
long timestamp = this.initialTimestamp;
for(Event event: this.events){
sourceContext.collectWithTimestamp(event, timestamp);
sourceContext.emitWatermark(new Watermark(timestamp));
timestamp+=this.timeBetweenInMillis;
}
}
@Override
public void cancel() {
}
}
我正在使用 TimeCharacteristics.EventTime。
由于 window 时间(3 秒)低于每个事件之间的事件时间差(7 秒),我希望得到一些超时事件,但我得到 0.
CEP Pattern
匹配一个或多个事件的序列; within(interval)
子句添加了一个额外的约束,即序列中的所有事件都必须在指定的 间隔 内发生。当部分匹配超时时,这可以在 TimedOutPartialMatchHandler
.
中捕获
在您的情况下,由于成功匹配的模式由单个事件组成,因此不会有部分匹配,并且匹配永远不会超时。 (您的匹配序列总是少于 3 秒。)
您可以做的是扩展模式定义以包含第二个事件,以便在 3 秒内必须有一个开始事件后跟另一个事件才能匹配。当第二个事件丢失时,您将有一个超时的部分匹配。
为了比 CEP 提供的实施涉及丢失事件的用例更灵活,您可以使用带计时器的 KeyedProcessFunction
。
我是 Apache Flink CEP 的新手,我正在努力检测简单的事件缺失。
我要检测的是具有特定 ID 的 CurrencyEvent 类型的事件是否在特定时间内未发生 .我想在每次 3000 毫秒后事件未发生时检测是否存在此类事件。
我的模式代码如下所示:
Pattern<CurrencyEvent, ?> myPattern = Pattern.<Event>begin("CurrencyEvent")
.subtype(CurrencyEvent.class)
.where(new SimpleCondition<CurrencyEvent>() {
@Override
public boolean filter(CurrencyEvent currencyEvent) throws Exception {
return currencyEvent.getId().equalsIgnoreCase("usd");
}
})
.within(Time.milliseconds(3000L));
所以现在我的想法是使用超时函数来检测超时事件:
DataStreamSource<Event> events = env.addSource(new TestSource(
Arrays.asList(
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("USD", 100L, Arrays.asList("1", "2"), 200D),
basicCurrencyWithMivLevelEvent("EUR", 100L, Arrays.asList("1", "2"), 200D)
),
1636040364820L, // initial timestamp for the first element
7000 // 7 seconds between each event
));
PatternStream<Event> patternStream = CEP.pattern(
events,
(Pattern<Event, ?>) myPattern
);
OutputTag<Alarm> tag = new OutputTag<Alarm>("currency-timeout"){};
PatternFlatTimeoutFunction<Event, Alarm> eventAlarmTimeoutPatternFunction = (patterns, timestamp, ctx) -> {
System.out.println("New alarm, since after 3 seconds an event with id=usd is not detected");
//TODO: call collect
};
PatternFlatSelectFunction<Event, Alarm> eventAlarmPatternSelectFunction = (patterns, ctx) -> {
System.out.println("Select! (we can ignore it) " + patterns);
// ignore matched events
};
return patternStream.flatSelect(
tag,
eventAlarmTimeoutPatternFunction,
TypeInformation.of(Alarm.class),
eventAlarmPatternSelectFunction
);
我的测试源使用的是事件时间戳和水印,如下图:
public class TestSource implements SourceFunction<Event> {
private final List<Event> events;
private final long initialTimestamp;
private final long timeBetweenInMillis;
public TestSource(List<Event> events, long initialTimestamp, long timeBetweenInMillis){
this.events = events;
this.initialTimestamp = initialTimestamp;
this.timeBetweenInMillis = timeBetweenInMillis;
}
@Override
public void run(SourceContext<Event> sourceContext) throws InterruptedException {
long timestamp = this.initialTimestamp;
for(Event event: this.events){
sourceContext.collectWithTimestamp(event, timestamp);
sourceContext.emitWatermark(new Watermark(timestamp));
timestamp+=this.timeBetweenInMillis;
}
}
@Override
public void cancel() {
}
}
我正在使用 TimeCharacteristics.EventTime。
由于 window 时间(3 秒)低于每个事件之间的事件时间差(7 秒),我希望得到一些超时事件,但我得到 0.
CEP Pattern
匹配一个或多个事件的序列; within(interval)
子句添加了一个额外的约束,即序列中的所有事件都必须在指定的 间隔 内发生。当部分匹配超时时,这可以在 TimedOutPartialMatchHandler
.
在您的情况下,由于成功匹配的模式由单个事件组成,因此不会有部分匹配,并且匹配永远不会超时。 (您的匹配序列总是少于 3 秒。)
您可以做的是扩展模式定义以包含第二个事件,以便在 3 秒内必须有一个开始事件后跟另一个事件才能匹配。当第二个事件丢失时,您将有一个超时的部分匹配。
为了比 CEP 提供的实施涉及丢失事件的用例更灵活,您可以使用带计时器的 KeyedProcessFunction
。