Apache Flink CEP 如何检测事件是否在 x 秒内未发生?
Apache Flink CEP how to detect if event did not occur within x seconds?
例如,A 应该在 10 秒内跟随 B。我知道如何跟踪此 DID 是否发生(.next,.within),但如果 B 从未在 window.
内发生,我想发送警报
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
// env.enableCheckpointing(1000);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
final DataStream<String> inputStream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"cep", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
inputStream.print();
Pattern<String, ?> simplePattern =
Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("A");
}
})
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("B");
}
});
PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
OutputTag<String> timedout = new OutputTag<String>("timedout"){};
SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
timedout,
new TimedOut<String>(),
new FlatSelectNothing<String>()
);
timedOutNotificationsStream.getSideOutput(timedout).print();
env.execute("mynotification");
}
public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
@Override
public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
out.collect((String) "LATE!");
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}
实际行为:
publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)
publish "A"
(wait 10 seconds)
=> (no alert, but should be)
publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"
预期行为:
publish "A"
(wait 10 seconds)
=> "LATE!"
您可以通过超时模式来完成。您可以指定模式,如
A followedBy B within 10 seconds
并检查超时的模式,这意味着只有 A。您可以查看文档以了解超时模式 here
有关完整示例,您可以参考此 training or straight to the solution to the excercise。
编辑:
现在(flink <1.5)在处理时间修剪仅在传入元素上完成。因此不幸的是,超时后必须至少有一个事件(匹配与否无关紧要)会触发超时。可以使用此 jira ticket
跟踪改进它的努力
你可以试试下面的解决方案吗?
package com.nirav.modi.cep;
import com.nirav.modi.dto.Event;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class EventNotOccur {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.addSource(new SourceFunction<Event>() {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
for (int i = 0; i < 1; i++) {
ctx.collect(new Event("A"));
Thread.sleep(5000);
ctx.collect(new Event("B"));
Thread.sleep(5000);
ctx.collect(new Event("A"));
Thread.sleep(15000);
ctx.collect(new Event("B"));
Thread.sleep(5000);
ctx.collect(new Event("B"));
}
}
@Override
public void cancel() {
}
});
Pattern<Event, ?> simplePattern =
Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("A");
}
})
.next("end")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("B");
}
});
source.print();
PatternStream<Event> timedOutPatternStream = CEP.pattern(source, simplePattern.within(Time.seconds(10)));
OutputTag<Event> timedout = new OutputTag<Event>("timedout") {
};
timedOutPatternStream.flatSelect(new PatternFlatSelectFunction<Event, String>() {
@Override
public void flatSelect(Map<String, List<Event>> pattern, Collector<String> out) throws Exception {
out.collect("Pattern Match...............");
}
}).print();
SingleOutputStreamOperator<Event> longRides = timedOutPatternStream
.flatSelect(
timedout,
new EventTimeOut(),
new FlatSelectNothing()
);
longRides.getSideOutput(timedout).print();
env.execute("Flink Streaming Java API Skeleton");
}
public static class EventTimeOut<Event> implements PatternFlatTimeoutFunction<Event, Event> {
@Override
public void timeout(Map<String, List<Event>> map, long l, Collector<Event> collector) throws Exception {
Event rideStarted = map.get("start").get(0);
System.out.println("Time out Partial Event : " + rideStarted);
collector.collect(rideStarted);
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
System.out.println("Flat select nothing: " + pattern.get("start").get(0));
collector.collect(pattern.get("start").get(0));
}
}
}
例如,A 应该在 10 秒内跟随 B。我知道如何跟踪此 DID 是否发生(.next,.within),但如果 B 从未在 window.
内发生,我想发送警报 public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
// env.enableCheckpointing(1000);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
final DataStream<String> inputStream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"cep", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
inputStream.print();
Pattern<String, ?> simplePattern =
Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("A");
}
})
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("B");
}
});
PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
OutputTag<String> timedout = new OutputTag<String>("timedout"){};
SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
timedout,
new TimedOut<String>(),
new FlatSelectNothing<String>()
);
timedOutNotificationsStream.getSideOutput(timedout).print();
env.execute("mynotification");
}
public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
@Override
public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
out.collect((String) "LATE!");
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}
实际行为:
publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)
publish "A"
(wait 10 seconds)
=> (no alert, but should be)
publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"
预期行为:
publish "A"
(wait 10 seconds)
=> "LATE!"
您可以通过超时模式来完成。您可以指定模式,如
A followedBy B within 10 seconds
并检查超时的模式,这意味着只有 A。您可以查看文档以了解超时模式 here
有关完整示例,您可以参考此 training or straight to the solution to the excercise。
编辑: 现在(flink <1.5)在处理时间修剪仅在传入元素上完成。因此不幸的是,超时后必须至少有一个事件(匹配与否无关紧要)会触发超时。可以使用此 jira ticket
跟踪改进它的努力你可以试试下面的解决方案吗?
package com.nirav.modi.cep;
import com.nirav.modi.dto.Event;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class EventNotOccur {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.addSource(new SourceFunction<Event>() {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
for (int i = 0; i < 1; i++) {
ctx.collect(new Event("A"));
Thread.sleep(5000);
ctx.collect(new Event("B"));
Thread.sleep(5000);
ctx.collect(new Event("A"));
Thread.sleep(15000);
ctx.collect(new Event("B"));
Thread.sleep(5000);
ctx.collect(new Event("B"));
}
}
@Override
public void cancel() {
}
});
Pattern<Event, ?> simplePattern =
Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("A");
}
})
.next("end")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("B");
}
});
source.print();
PatternStream<Event> timedOutPatternStream = CEP.pattern(source, simplePattern.within(Time.seconds(10)));
OutputTag<Event> timedout = new OutputTag<Event>("timedout") {
};
timedOutPatternStream.flatSelect(new PatternFlatSelectFunction<Event, String>() {
@Override
public void flatSelect(Map<String, List<Event>> pattern, Collector<String> out) throws Exception {
out.collect("Pattern Match...............");
}
}).print();
SingleOutputStreamOperator<Event> longRides = timedOutPatternStream
.flatSelect(
timedout,
new EventTimeOut(),
new FlatSelectNothing()
);
longRides.getSideOutput(timedout).print();
env.execute("Flink Streaming Java API Skeleton");
}
public static class EventTimeOut<Event> implements PatternFlatTimeoutFunction<Event, Event> {
@Override
public void timeout(Map<String, List<Event>> map, long l, Collector<Event> collector) throws Exception {
Event rideStarted = map.get("start").get(0);
System.out.println("Time out Partial Event : " + rideStarted);
collector.collect(rideStarted);
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
System.out.println("Flat select nothing: " + pattern.get("start").get(0));
collector.collect(pattern.get("start").get(0));
}
}
}