SlidingEventTimeWindows 不产生任何输出
SlidingEventTimeWindows does not produce any output
我将流执行配置为
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
SystemsCpu.TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
config)
.setStartFromLatest());
DataStream<Anomaly> anomalies = stream
.keyBy(x -> x.get("host").toString())
.window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
.process(new AnomalyDetector())
.name("anomaly-detector");
public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
var anomaly = new Anomaly();
anomaly.setValue(1.0);
out.collect(anomaly);
}
}
但是由于某些原因 SlidingEventTimeWindows
没有产生任何输出供 AnomalyDetector
处理(即根本没有触发过程)。例如,如果我使用 TumblingEventTimeWindows
,它会按预期工作。
知道是什么原因造成的吗?我使用 SlidingEventTimeWindows
不正确吗?
您正在使用 SlidingEventTimeWindows
,但您的流执行环境默认配置为处理时间。使用 SlidingProcessingTimeWindows
或像这样
为事件时间配置环境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
事件时间还需要一个特殊的时间戳分配器,您可以在此处找到更多信息。
https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink?hs_amp=true
在进行任何类型的事件时间窗口时,有必要提供 WatermarkStrategy
。水印标记流中的一个点,并表示流已通过某个特定时间点完成。事件时间 windows 只能由足够大的水印到达触发。
有关详细信息,请参阅 the docs,但这可能是这样的:
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
但是,由于您使用的是 Kafka,通常最好让 Flink Kafka 消费者进行水印处理:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);
DataStream<MyType> stream = env.addSource(kafkaSource);
请注意,如果您使用后一种方法,并且您的事件在每个 Kafka 分区中按时间顺序排列,则可以利用 Flink Kafka 源提供的每个分区水印,并使用 WatermarkStrategy.forMonotonousTimestamps()
而不是有序有界策略。这有很多优点。
顺便说一下,这与您的问题无关,但您应该知道,通过指定 SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))
,每个事件都将被复制到 60 个重叠的每个 windows.
我将流执行配置为
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
SystemsCpu.TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
config)
.setStartFromLatest());
DataStream<Anomaly> anomalies = stream
.keyBy(x -> x.get("host").toString())
.window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
.process(new AnomalyDetector())
.name("anomaly-detector");
public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
var anomaly = new Anomaly();
anomaly.setValue(1.0);
out.collect(anomaly);
}
}
但是由于某些原因 SlidingEventTimeWindows
没有产生任何输出供 AnomalyDetector
处理(即根本没有触发过程)。例如,如果我使用 TumblingEventTimeWindows
,它会按预期工作。
知道是什么原因造成的吗?我使用 SlidingEventTimeWindows
不正确吗?
您正在使用 SlidingEventTimeWindows
,但您的流执行环境默认配置为处理时间。使用 SlidingProcessingTimeWindows
或像这样
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
事件时间还需要一个特殊的时间戳分配器,您可以在此处找到更多信息。
https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink?hs_amp=true
在进行任何类型的事件时间窗口时,有必要提供 WatermarkStrategy
。水印标记流中的一个点,并表示流已通过某个特定时间点完成。事件时间 windows 只能由足够大的水印到达触发。
有关详细信息,请参阅 the docs,但这可能是这样的:
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
但是,由于您使用的是 Kafka,通常最好让 Flink Kafka 消费者进行水印处理:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);
DataStream<MyType> stream = env.addSource(kafkaSource);
请注意,如果您使用后一种方法,并且您的事件在每个 Kafka 分区中按时间顺序排列,则可以利用 Flink Kafka 源提供的每个分区水印,并使用 WatermarkStrategy.forMonotonousTimestamps()
而不是有序有界策略。这有很多优点。
顺便说一下,这与您的问题无关,但您应该知道,通过指定 SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))
,每个事件都将被复制到 60 个重叠的每个 windows.