Flink 作业无法在批处理作业中使用保存点

Flink job cant use savepoint in a batch job

让我以一种通用的方式开始,看看我是否以某种方式错过了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。此作业的简化版本如下所示

伪代码:

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
  flink.readFile(path)
}
else {
  flink.addKafkaSource(topicName)
} 

stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())

CassandraSink.addSink(stream)

只要我 运行 没有保存点的作业就可以正常工作。如果我从一个保存点开始工作,我会得到一个看起来像这样的异常

Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend
    at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
    at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)

如果我设置选项,我可以解决这个问题:

execution.batch-state-backend.enabled: false

但这最终会导致另一个错误:

Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)

当然我尝试设置配置键taskmanager.memory.managed.consumer-weights(使用DATAPROC:70,PYTHON:30)但是这似乎没有任何效果。

所以我想知道我是否有概念错误,无法在批处理作业中重用流作业中的保存点,或者我的配置是否有问题。有什么提示吗?

在得到 flink 用户组的提示后,事实证明无法重用流作业中的保存点 (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/execution_mode/#state-backends--state)。因此,我没有像在批处理模式 (flink.setRuntimeMode(RuntimeExecutionMode.BATCH)) 中那样 运行 处理作业,而是在默认执行模式 (STREAMING) 中 运行 它。这有一个小缺点,它将永远 运行,并且一旦处理完所有数据就必须由某人停止。