FlinkCEP 模式检测不是实时发生的
FlinkCEP pattern detection doesn't happen in real time
我还是 Flink CEP 库的新手,但我不了解模式检测行为。
考虑下面的示例,我有一个 Flink 应用程序使用来自 kafka 主题的数据,数据是定期生成的,我想使用 Flink CEP 模式来检测值何时大于给定阈值。
代码如下:
public class CEPJob{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
properties);
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<String> stream = env.addSource(consumer);
// Process incoming data.
DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {
private static final long serialVersionUID = -491668877013085114L;
@Override
public Stock map(String value) {
String[] data = value.split(":");
System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);
Stock stock = new Stock(data[0], Double.parseDouble(data[1]));
return stock;
}
});
// Create the pattern
Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(Stock value) throws Exception {
return (value.getAdj_Close() > 140.0);
}
});
// Create a pattern stream from our warning pattern
PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);
// Generate alert for each matched pattern
DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
Stock first = pattern.get("first").get(0);
return first;
});
warnings.print();
env.execute("CEP job");
}
}
当我运行作业时会发生什么,模式检测不是实时发生的,它仅在产生第二条记录后才输出当前记录检测到模式的警告,看起来就像它延迟打印到日志警告一样,我真的不明白如何让它在检测到模式时输出警告而不等待下一条记录,谢谢 :) .
来自 Kafka 的数据是字符串格式:“date:value”,它每 5 秒产生一次数据。
Java版本:1.8,Scala版本:2.11.12,Flink版本:1.12.2,Kafka版本:2.3.0
我发现的解决方案是每次我为主题生成一个值时在 Kafka 主题中发送一个假记录(例如一个空对象),并且在 Flink 端(在模式声明中)我测试是否收到的记录是否是假的。
FlinkCEP 似乎总是在等待即将发生的事件,然后才输出警告。
我还是 Flink CEP 库的新手,但我不了解模式检测行为。 考虑下面的示例,我有一个 Flink 应用程序使用来自 kafka 主题的数据,数据是定期生成的,我想使用 Flink CEP 模式来检测值何时大于给定阈值。 代码如下:
public class CEPJob{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
properties);
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<String> stream = env.addSource(consumer);
// Process incoming data.
DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {
private static final long serialVersionUID = -491668877013085114L;
@Override
public Stock map(String value) {
String[] data = value.split(":");
System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);
Stock stock = new Stock(data[0], Double.parseDouble(data[1]));
return stock;
}
});
// Create the pattern
Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(Stock value) throws Exception {
return (value.getAdj_Close() > 140.0);
}
});
// Create a pattern stream from our warning pattern
PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);
// Generate alert for each matched pattern
DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
Stock first = pattern.get("first").get(0);
return first;
});
warnings.print();
env.execute("CEP job");
}
}
当我运行作业时会发生什么,模式检测不是实时发生的,它仅在产生第二条记录后才输出当前记录检测到模式的警告,看起来就像它延迟打印到日志警告一样,我真的不明白如何让它在检测到模式时输出警告而不等待下一条记录,谢谢 :) .
来自 Kafka 的数据是字符串格式:“date:value”,它每 5 秒产生一次数据。
Java版本:1.8,Scala版本:2.11.12,Flink版本:1.12.2,Kafka版本:2.3.0
我发现的解决方案是每次我为主题生成一个值时在 Kafka 主题中发送一个假记录(例如一个空对象),并且在 Flink 端(在模式声明中)我测试是否收到的记录是否是假的。 FlinkCEP 似乎总是在等待即将发生的事件,然后才输出警告。