Apache Flink:Datastream API 的批处理模式失败,异常为“IllegalStateException:排序输入不允许检查点”。
Apache Flink : Batch Mode failing for Datastream API's with exception `IllegalStateException: Checkpointing is not allowed with sorted inputs.`
此的延续:Flink : Handling Keyed Streams with data older than application watermark
根据建议,我一直在尝试在使用 Datastream API 的同一个 Flink 应用程序中添加对 Batch 的支持。
逻辑是这样的:
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<DetectionEvent>forBoundedOutOfOrderness(orderness)
.withTimestampAssigner(
(SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);
根据 public 文档,我的理解是我只需要将源更改为有界源。但是,在窗口化步骤之后,上述处理在事件触发时继续失败,但出现以下异常:
java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
输入文件包含多个键的历史事件。给定键的数据已排序,但整体数据未排序。我还在每个键的末尾添加了一个事件,时间戳 = MAX_WATERMARK 以指示键控流的结束。我也对单个密钥进行了尝试,但处理失败并出现相同的异常。
注意:我没有启用检查点。
我也试过明确禁用检查点无济于事。
env.getCheckpointConfig().disableCheckpointing();
编辑 - 1
添加更多详细信息:
我尝试更改并使用 FileSource 来读取文件,但仍然遇到相同的异常。
environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
第一个流程步骤和密钥拆分有效。但是在那之后它失败了。我尝试删除窗口并添加一个简单的流程步骤,但它仍然失败。
没有明确的接收器。最后一个过程函数只是更新一个数据库。
有什么我遗漏的吗?
只有启用检查点才能抛出该异常。也许你可以在 flink-conf.yaml?
中配置检查点间隔
此的延续:Flink : Handling Keyed Streams with data older than application watermark
根据建议,我一直在尝试在使用 Datastream API 的同一个 Flink 应用程序中添加对 Batch 的支持。
逻辑是这样的:
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<DetectionEvent>forBoundedOutOfOrderness(orderness)
.withTimestampAssigner(
(SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);
根据 public 文档,我的理解是我只需要将源更改为有界源。但是,在窗口化步骤之后,上述处理在事件触发时继续失败,但出现以下异常:
java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
输入文件包含多个键的历史事件。给定键的数据已排序,但整体数据未排序。我还在每个键的末尾添加了一个事件,时间戳 = MAX_WATERMARK 以指示键控流的结束。我也对单个密钥进行了尝试,但处理失败并出现相同的异常。
注意:我没有启用检查点。 我也试过明确禁用检查点无济于事。
env.getCheckpointConfig().disableCheckpointing();
编辑 - 1
添加更多详细信息: 我尝试更改并使用 FileSource 来读取文件,但仍然遇到相同的异常。
environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
第一个流程步骤和密钥拆分有效。但是在那之后它失败了。我尝试删除窗口并添加一个简单的流程步骤,但它仍然失败。 没有明确的接收器。最后一个过程函数只是更新一个数据库。
有什么我遗漏的吗?
只有启用检查点才能抛出该异常。也许你可以在 flink-conf.yaml?
中配置检查点间隔