Flink 作业无法在批处理作业中使用保存点
Flink job cant use savepoint in a batch job
让我以一种通用的方式开始,看看我是否以某种方式错过了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。此作业的简化版本如下所示
伪代码:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
flink.readFile(path)
}
else {
flink.addKafkaSource(topicName)
}
stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())
CassandraSink.addSink(stream)
只要我 运行 没有保存点的作业就可以正常工作。如果我从一个保存点开始工作,我会得到一个看起来像这样的异常
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend
at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
如果我设置选项,我可以解决这个问题:
execution.batch-state-backend.enabled: false
但这最终会导致另一个错误:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
当然我尝试设置配置键taskmanager.memory.managed.consumer-weights
(使用DATAPROC:70,PYTHON:30
)但是这似乎没有任何效果。
所以我想知道我是否有概念错误,无法在批处理作业中重用流作业中的保存点,或者我的配置是否有问题。有什么提示吗?
在得到 flink 用户组的提示后,事实证明无法重用流作业中的保存点 (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/execution_mode/#state-backends--state)。因此,我没有像在批处理模式 (flink.setRuntimeMode(RuntimeExecutionMode.BATCH)
) 中那样 运行 处理作业,而是在默认执行模式 (STREAMING
) 中 运行 它。这有一个小缺点,它将永远 运行,并且一旦处理完所有数据就必须由某人停止。
让我以一种通用的方式开始,看看我是否以某种方式错过了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。此作业的简化版本如下所示
伪代码:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
flink.readFile(path)
}
else {
flink.addKafkaSource(topicName)
}
stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())
CassandraSink.addSink(stream)
只要我 运行 没有保存点的作业就可以正常工作。如果我从一个保存点开始工作,我会得到一个看起来像这样的异常
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend
at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
如果我设置选项,我可以解决这个问题:
execution.batch-state-backend.enabled: false
但这最终会导致另一个错误:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
当然我尝试设置配置键taskmanager.memory.managed.consumer-weights
(使用DATAPROC:70,PYTHON:30
)但是这似乎没有任何效果。
所以我想知道我是否有概念错误,无法在批处理作业中重用流作业中的保存点,或者我的配置是否有问题。有什么提示吗?
在得到 flink 用户组的提示后,事实证明无法重用流作业中的保存点 (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/execution_mode/#state-backends--state)。因此,我没有像在批处理模式 (flink.setRuntimeMode(RuntimeExecutionMode.BATCH)
) 中那样 运行 处理作业,而是在默认执行模式 (STREAMING
) 中 运行 它。这有一个小缺点,它将永远 运行,并且一旦处理完所有数据就必须由某人停止。