Apache Flink:Datastream API 的批处理模式失败,异常为“IllegalStateException:排序输入不允许检查点”。

Apache Flink : Batch Mode failing for Datastream API's with exception `IllegalStateException: Checkpointing is not allowed with sorted inputs.`

此的延续:Flink : Handling Keyed Streams with data older than application watermark

根据建议,我一直在尝试在使用 Datastream API 的同一个 Flink 应用程序中添加对 Batch 的支持。

逻辑是这样的:

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

根据 public 文档,我的理解是我只需要将源更改为有界源。但是,在窗口化步骤之后,上述处理在事件触发时继续失败,但出现以下异常:

java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)

输入文件包含多个键的历史事件。给定键的数据已排序,但整体数据未排序。我还在每个键的末尾添加了一个事件,时间戳 = MAX_WATERMARK 以指示键控流的结束。我也对单个密钥进行了尝试,但处理失败并出现相同的异常。

注意:我没有启用检查点。 我也试过明确禁用检查点无济于事。

env.getCheckpointConfig().disableCheckpointing();

编辑 - 1

添加更多详细信息: 我尝试更改并使用 FileSource 来读取文件,但仍然遇到相同的异常。

environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")

第一个流程步骤和密钥拆分有效。但是在那之后它失败了。我尝试删除窗口并添加一个简单的流程步骤,但它仍然失败。 没有明确的接收器。最后一个过程函数只是更新一个数据库。

有什么我遗漏的吗?

只有启用检查点才能抛出该异常。也许你可以在 flink-conf.yaml?

中配置检查点间隔