事件时间的结束时间戳 window 不能通过合并变得早于当前水印

The end timestamp of an event-time window cannot become earlier than the current watermark by merging

我们已经在 AWS Kinesis Analytics 运行 中创建了一个流式 Flink 应用程序。主要用于处理网页点击流数据(页面浏览量、sessionization等)。我们有来自 Kinesis 数据流的页面浏览量输入,它被分成键控 windows(由 session/device 令牌键控)。

该应用程序在小范围内运行良好,但是当按我们预期的正常生产吞吐量(每天约 100 万页浏览量)进行扩展以进行测试时,我们在合并时会定期遇到错误 windows:

“The end timestamp of an event-time window cannot become earlier than the current watermark by merging.”

这个 UnsupportedOperationException 使我们的应用程序崩溃,当它重新启动时,它会尝试再次处理相同的 window 并一次又一次地崩溃。我们已将此异常追溯到以下 PR (https://github.com/apache/flink/pull/3587),但我们对如何处理这种情况有点不知所措。我们的主要目标是防止应用程序崩溃或以任何方式破坏应用程序的状态。

我们已尝试更改 maxOutOfOrderness 以查看应用程序的行为是否不同,但尚未找到不会发生错误的情况,除非我们将其设置为非常低的数字,例如 1。

/Create input data streams from kinesis data streams
    DataStream<String> pvInput;

    if (env.getIsLocal()) {
        pvInput = createLocalDataStream(streamEnv, "pv-stream", env);
    } else {
        pvInput = createAwsDataStream(streamEnv, env.get("pv-stream"), env);
    }

    ObjectMapper mapper = new ObjectMapper();

/* SOURCES AND INITIAL MAPPING */

    //Turn pageview strings into pageview objects and assign timestamps
    DataStream<PageView> mappedPvs = pvInput
            .map(value -> mapper.readValue(value, PageView.class)).uid("pv_mapper").name("PV Mapper")
            .filter(value -> value.timestamp != null && value.uuid != null).uid("pv_filter").name("PV Filter")
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PageView>(Time.minutes(30)) {
                @Override
                public long extractTimestamp(PageView element) {
                    return element.timestamp.getTime();
                }
            }).uid("pv_timestamp_assigner").name("PV Timestamps");

/* SESSIONIZATION */

    //Key Pageviews by uuid for sessionization
    KeyedStream<PageView, String> keyedPvStream = mappedPvs
            .keyBy((KeySelector<PageView, String>) value -> value.uuid);

    long sessionWindow = 30L;

    //Window pageviews into sessions
    DataStream<PageViewAccumulator> sessionized = keyedPvStream
        .window(ActivitySessionAssigner.withGap(Time.minutes(sessionWindow)))
        .aggregate(new PageViewAggregateFunction()).uid("session_window").name("Session Window");

预期结果是合并的结果 windows 永远不会导致结束时间戳早于当前水印。

实际结果是它们确实发生了,导致以下异常:

{
    "locationInformation": "org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1384)",
    "logger": "org.apache.flink.runtime.executiongraph.ExecutionGraph",
    "message": "Failure type is SYSTEM on RUNNING -> FAILING.",
    "throwableInformation": [
        "java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1555506438433 window: TimeWindow{start=1555455813013, end=1555457829192}",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.merge(WindowOperator.java:320)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.merge(WindowOperator.java:311)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)",
        "\tat org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)",
        "\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)",
        "\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)",
        "\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)",
        "\tat java.lang.Thread.run(Thread.java:748)"
    ],
    "threadName": "flink-akka.actor.default-dispatcher-16170",
    "applicationARN": "arn:aws:kinesisanalytics:us-xxx-x:XXXXXXXXXXXXX:application/XXXXX",
    "applicationVersionId": "6",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

可以通过使用如下所示的 ProcessFunction 过滤延迟事件来解决此问题。将此函数放在时间戳提取器和 window 函数之间会删除任何延迟事件,从而消除发生此错误的可能性。

public class LateEventFilter extends ProcessFunction<PageView, PageView> {
    @Override
    public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
        if(ctx.timestamp() > ctx.timerService().currentWatermark()){
            out.collect(value);
        }
    }
}

您也可以使用类似的函数将延迟事件输出到接收器,如下例所示。

public class LateEventSideOutput extends ProcessFunction<PageView, PageView> {
    @Override
    public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
        if(ctx.timestamp() <= ctx.timerService().currentWatermark()) {
            out.collect(value);
        }
    }
}

全部接线看起来像这样:

DataStream<PageView> lateFilteredPvs = mappedPvs.process(new LateEventFilter()).uid("late_pv_filter").name("LatePvFilter");

DataStream<PageView> latePvs = mappedPvs.process(new LateEventSideOutput()).uid("late_pv").name("LatePv");
                l 
latePvs.addSink(latePvSink).uid("late_pv_sink").name("LatePvSink");