为什么 CEP 在使用 ProcessingTime 时仅在我输入第二个事件后才打印第一个事件?
Why CEP doesn't print the first event only after I input second event when using ProcessingTime?
我向 kafka 发送了一个 isStart 为 true 的事件,并让 Flink 从 kafka 消费了该事件,还将 TimeCharacteristic 设置为 ProcessingTime 并设置在 (Time.seconds(5)) 之内,所以我预计 CEP将在我发送第一个事件 5 秒后打印事件,但它没有,并且仅在我将第二个事件发送到 kafka 后才打印第一个事件。为什么它在发送两个事件后只打印第一个事件?难道不应该在我使用 ProcessingTime 发送第一个事件后的 5 秒后打印事件吗?
代码如下:
public class LongRidesWithKafka {
private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
private static final int MAX_EVENT_DELAY = 60; // rides are at most 60 sec out-of-order.
public static void main(String[] args) throws Exception {
final int popThreshold = 1; // threshold for popular places
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaProps = new Properties();
//kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset", "earliest");
// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
"flinktest",
new TaxiRideSchema(),
kafkaProps);
// assign a timestamp extractor to the consumer
//consumer.assignTimestampsAndWatermarks(new CustomWatermarkExtractor());
DataStream<TaxiRide> rides = env.addSource(consumer);
DataStream<TaxiRide> keyedRides = rides.keyBy("rideId");
// A complete taxi ride has a START event followed by an END event
Pattern<TaxiRide, TaxiRide> completedRides =
Pattern.<TaxiRide>begin("start")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return ride.isStart;
}
})
.next("end")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return !ride.isStart;
}
});
// We want to find rides that have NOT been completed within 120 minutes
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.seconds(5)));
OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout") {
};
SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
timedout,
new LongRides.TaxiRideTimedOut<TaxiRide>(),
new LongRides.FlatSelectNothing<TaxiRide>()
);
longRides.getSideOutput(timedout).print();
env.execute("Long Taxi Rides");
}
public static class TaxiRideTimedOut<TaxiRide> implements PatternFlatTimeoutFunction<TaxiRide, TaxiRide> {
@Override
public void timeout(Map<String, List<TaxiRide>> map, long l, Collector<TaxiRide> collector) throws Exception {
TaxiRide rideStarted = map.get("start").get(0);
collector.collect(rideStarted);
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
}
}
private static class TaxiRideTSExtractor extends AscendingTimestampExtractor<TaxiRide> {
private static final long serialVersionUID = 1L;
@Override
public long extractAscendingTimestamp(TaxiRide ride) {
// Watermark Watermark = getCurrentWatermark();
if (ride.isStart) {
return ride.startTime.getMillis();
} else {
return ride.endTime.getMillis();
}
}
}
private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<TaxiRide> {
private static final long serialVersionUID = -742759155861320823L;
private long currentTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(TaxiRide ride, long previousElementTimestamp) {
// the inputs are assumed to be of format (message,timestamp)
if (ride.isStart) {
this.currentTimestamp = ride.startTime.getMillis();
return ride.startTime.getMillis();
} else {
this.currentTimestamp = ride.endTime.getMillis();
return ride.endTime.getMillis();
}
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
}
}
原因是 Flink 的 CEP 库目前仅在另一个元素到达并被处理时才检查时间戳。基本假设是您有稳定的事件流。
我认为这是 Flink 的 CEP 库的一个限制。为了正常工作,Flink 应该使用 arrivalTime + timeout
注册处理时间计时器,如果没有事件到达,它会触发模式超时。
我向 kafka 发送了一个 isStart 为 true 的事件,并让 Flink 从 kafka 消费了该事件,还将 TimeCharacteristic 设置为 ProcessingTime 并设置在 (Time.seconds(5)) 之内,所以我预计 CEP将在我发送第一个事件 5 秒后打印事件,但它没有,并且仅在我将第二个事件发送到 kafka 后才打印第一个事件。为什么它在发送两个事件后只打印第一个事件?难道不应该在我使用 ProcessingTime 发送第一个事件后的 5 秒后打印事件吗?
代码如下:
public class LongRidesWithKafka {
private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
private static final int MAX_EVENT_DELAY = 60; // rides are at most 60 sec out-of-order.
public static void main(String[] args) throws Exception {
final int popThreshold = 1; // threshold for popular places
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaProps = new Properties();
//kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset", "earliest");
// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
"flinktest",
new TaxiRideSchema(),
kafkaProps);
// assign a timestamp extractor to the consumer
//consumer.assignTimestampsAndWatermarks(new CustomWatermarkExtractor());
DataStream<TaxiRide> rides = env.addSource(consumer);
DataStream<TaxiRide> keyedRides = rides.keyBy("rideId");
// A complete taxi ride has a START event followed by an END event
Pattern<TaxiRide, TaxiRide> completedRides =
Pattern.<TaxiRide>begin("start")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return ride.isStart;
}
})
.next("end")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return !ride.isStart;
}
});
// We want to find rides that have NOT been completed within 120 minutes
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.seconds(5)));
OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout") {
};
SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
timedout,
new LongRides.TaxiRideTimedOut<TaxiRide>(),
new LongRides.FlatSelectNothing<TaxiRide>()
);
longRides.getSideOutput(timedout).print();
env.execute("Long Taxi Rides");
}
public static class TaxiRideTimedOut<TaxiRide> implements PatternFlatTimeoutFunction<TaxiRide, TaxiRide> {
@Override
public void timeout(Map<String, List<TaxiRide>> map, long l, Collector<TaxiRide> collector) throws Exception {
TaxiRide rideStarted = map.get("start").get(0);
collector.collect(rideStarted);
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
}
}
private static class TaxiRideTSExtractor extends AscendingTimestampExtractor<TaxiRide> {
private static final long serialVersionUID = 1L;
@Override
public long extractAscendingTimestamp(TaxiRide ride) {
// Watermark Watermark = getCurrentWatermark();
if (ride.isStart) {
return ride.startTime.getMillis();
} else {
return ride.endTime.getMillis();
}
}
}
private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<TaxiRide> {
private static final long serialVersionUID = -742759155861320823L;
private long currentTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(TaxiRide ride, long previousElementTimestamp) {
// the inputs are assumed to be of format (message,timestamp)
if (ride.isStart) {
this.currentTimestamp = ride.startTime.getMillis();
return ride.startTime.getMillis();
} else {
this.currentTimestamp = ride.endTime.getMillis();
return ride.endTime.getMillis();
}
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
}
}
原因是 Flink 的 CEP 库目前仅在另一个元素到达并被处理时才检查时间戳。基本假设是您有稳定的事件流。
我认为这是 Flink 的 CEP 库的一个限制。为了正常工作,Flink 应该使用 arrivalTime + timeout
注册处理时间计时器,如果没有事件到达,它会触发模式超时。