为什么我的水印没有在我的 Apache Flink 键控流中前进?
Why does my watermark not advance in my Apache Flink keyed stream?
我目前正在将 Apache Flink 1.13.2 与 Java 一起用于我的流应用程序。我正在使用没有 window 功能的键控功能。我已经根据文档实施了水印策略和 autoWatermarkInterval
配置,尽管我的水印没有推进。
我已经通过使用 Flink web UI 并在我的 EventProcessor
KeyedProcessFunction
中打印当前水印进行了双重检查,但是水印一直设置为非常大的负数-9223372036854775808
(最低水印)。
env.getConfig().setAutoWatermarkInterval(1000);
WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
.<EventPayload>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<EventPayload> deserialized = input
.assignTimestampsAndWatermarks(watermarkStrategy)
.flatMap(new Deserializer());
DataStream<EnrichedEventPayload> resultStream =
AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.process(new EventProcessor());
我什至尝试将 WatermarkStrategy
添加到它使用 keyBy
的流中(并调整类型以匹配),但仍然没有成功。
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new EventProcessor());
我也尝试过使用我自己的 class 实现 WatermarkStrategy
并在 onEvent
函数上设置断点以确保发出新水印,尽管它仍然没有前进(并且任何关联的计时器都没有触发)。
如有任何帮助,我们将不胜感激!
如果水印策略的并行实例之一空闲(即,如果没有事件流经它),就会发生这种情况。在水印策略上使用 withIdleness(...)
选项是解决此问题的一种方法。
我目前正在将 Apache Flink 1.13.2 与 Java 一起用于我的流应用程序。我正在使用没有 window 功能的键控功能。我已经根据文档实施了水印策略和 autoWatermarkInterval
配置,尽管我的水印没有推进。
我已经通过使用 Flink web UI 并在我的 EventProcessor
KeyedProcessFunction
中打印当前水印进行了双重检查,但是水印一直设置为非常大的负数-9223372036854775808
(最低水印)。
env.getConfig().setAutoWatermarkInterval(1000);
WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
.<EventPayload>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<EventPayload> deserialized = input
.assignTimestampsAndWatermarks(watermarkStrategy)
.flatMap(new Deserializer());
DataStream<EnrichedEventPayload> resultStream =
AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.process(new EventProcessor());
我什至尝试将 WatermarkStrategy
添加到它使用 keyBy
的流中(并调整类型以匹配),但仍然没有成功。
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new EventProcessor());
我也尝试过使用我自己的 class 实现 WatermarkStrategy
并在 onEvent
函数上设置断点以确保发出新水印,尽管它仍然没有前进(并且任何关联的计时器都没有触发)。
如有任何帮助,我们将不胜感激!
如果水印策略的并行实例之一空闲(即,如果没有事件流经它),就会发生这种情况。在水印策略上使用 withIdleness(...)
选项是解决此问题的一种方法。