为什么我的水印没有在我的 Apache Flink 键控流中前进?

Why does my watermark not advance in my Apache Flink keyed stream?

我目前正在将 Apache Flink 1.13.2 与 Java 一起用于我的流应用程序。我正在使用没有 window 功能的键控功能。我已经根据文档实施了水印策略和 autoWatermarkInterval 配置,尽管我的水印没有推进。

我已经通过使用 Flink web UI 并在我的 EventProcessor KeyedProcessFunction 中打印当前水印进行了双重检查,但是水印一直设置为非常大的负数-9223372036854775808(最低水印)。

env.getConfig().setAutoWatermarkInterval(1000);

WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
        .<EventPayload>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

DataStream<EventPayload> deserialized = input
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .flatMap(new Deserializer());

DataStream<EnrichedEventPayload> resultStream =
        AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .process(new EventProcessor());

我什至尝试将 WatermarkStrategy 添加到它使用 keyBy 的流中(并调整类型以匹配),但仍然没有成功。

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .process(new EventProcessor());

我也尝试过使用我自己的 class 实现 WatermarkStrategy 并在 onEvent 函数上设置断点以确保发出新水印,尽管它仍然没有前进(并且任何关联的计时器都没有触发)。

如有任何帮助,我们将不胜感激!

如果水印策略的并行实例之一空闲(即,如果没有事件流经它),就会发生这种情况。在水印策略上使用 withIdleness(...) 选项是解决此问题的一种方法。