Apache Flink CEP如何根据事件值及时传递window?
Apache Flink CEP how to pass in time window based on event value?
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
有什么方法可以将 Time.seconds(10)
替换为通过 Event
传入的 value.getSomeTimeField()
?
我猜您想以事件时间方式工作。有关它的更多信息,您可以查看有关如何从元素中提取时间戳的 docs and this 部分。
在您的示例中,您可以这样做:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return value.getSomeTimeField();
}
})
CEP.pattern(input, pattern).select(...)
这样事件将在流中自动排序,并且超时将在两种情况下都适用于时间字段。
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
有什么方法可以将 Time.seconds(10)
替换为通过 Event
传入的 value.getSomeTimeField()
?
我猜您想以事件时间方式工作。有关它的更多信息,您可以查看有关如何从元素中提取时间戳的 docs and this 部分。
在您的示例中,您可以这样做:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return value.getSomeTimeField();
}
})
CEP.pattern(input, pattern).select(...)
这样事件将在流中自动排序,并且超时将在两种情况下都适用于时间字段。