如何使用 Flink CEP 创建 batch 或 slide windows?
How to create batch or slide windows using Flink CEP?
我刚从 Flink CEP
开始,我来自 Esper CEP
引擎。您可能(或不)知道,在Esper
中使用他们的语法(EPL
)您可以创建一个batch
或slide
window 轻松地将这些事件分组 windows 并允许您将此事件与函数(平均、最大、最小、...)一起使用。
例如,使用以下模式,您可以创建一个 5 秒的批处理 windows,并计算您拥有的所有 Stock
事件的属性 price
的平均值在指定的 window.
中收到
select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何在 Flink CEP
上实现它。我知道 Flink CEP
中的目标或方法可能不同,因此实现它的方法可能不像 Esper CEP
.
中那么简单
我已经查看了关于时间 windows 的 docs,但我无法将此 windows 与 Flink CEP
一起实施。因此,给定以下代码:
DataStream<Stock> stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
*/
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
如何创建 window,在其中,从收到的第一个事件开始,我开始计算 5 秒内以下事件的平均值。例如:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5 秒后的平均值为 1.5,将触发警报。我该如何编码?
谢谢!
使用 Flink 的 CEP 库无法表达这种行为。我宁愿推荐使用 Flink 的 DataStream
或 Table API 来计算平均值。基于此,您可以再次使用 CEP 生成其他事件。
final DataStream<Stock> input = env
.fromElements(
new Stock(1L, 1.0),
new Stock(2L, 2.0),
new Stock(3L, 1.0),
new Stock(4L, 2.0))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Stock element) {
return element.getTimestamp();
}
});
final DataStream<Double> windowAggregation = input
.timeWindowAll(Time.milliseconds(2))
.aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
@Override
public Tuple2<Integer, Double> createAccumulator() {
return Tuple2.of(0, 0.0);
}
@Override
public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
}
@Override
public Double getResult(Tuple2<Integer, Double> accumulator) {
return accumulator.f1 / accumulator.f0;
}
@Override
public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);
我刚从 Flink CEP
开始,我来自 Esper CEP
引擎。您可能(或不)知道,在Esper
中使用他们的语法(EPL
)您可以创建一个batch
或slide
window 轻松地将这些事件分组 windows 并允许您将此事件与函数(平均、最大、最小、...)一起使用。
例如,使用以下模式,您可以创建一个 5 秒的批处理 windows,并计算您拥有的所有 Stock
事件的属性 price
的平均值在指定的 window.
select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何在 Flink CEP
上实现它。我知道 Flink CEP
中的目标或方法可能不同,因此实现它的方法可能不像 Esper CEP
.
我已经查看了关于时间 windows 的 docs,但我无法将此 windows 与 Flink CEP
一起实施。因此,给定以下代码:
DataStream<Stock> stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
*/
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
如何创建 window,在其中,从收到的第一个事件开始,我开始计算 5 秒内以下事件的平均值。例如:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5 秒后的平均值为 1.5,将触发警报。我该如何编码?
谢谢!
使用 Flink 的 CEP 库无法表达这种行为。我宁愿推荐使用 Flink 的 DataStream
或 Table API 来计算平均值。基于此,您可以再次使用 CEP 生成其他事件。
final DataStream<Stock> input = env
.fromElements(
new Stock(1L, 1.0),
new Stock(2L, 2.0),
new Stock(3L, 1.0),
new Stock(4L, 2.0))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Stock element) {
return element.getTimestamp();
}
});
final DataStream<Double> windowAggregation = input
.timeWindowAll(Time.milliseconds(2))
.aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
@Override
public Tuple2<Integer, Double> createAccumulator() {
return Tuple2.of(0, 0.0);
}
@Override
public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
}
@Override
public Double getResult(Tuple2<Integer, Double> accumulator) {
return accumulator.f1 / accumulator.f0;
}
@Override
public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);