org.apache.flink.runtime.checkpoint.CheckpointException: 作业的部分任务已经完成

org.apache.flink.runtime.checkpoint.CheckpointException: Some tasks of the job have already finished

我想通过 rest api 停止 flink 任务,然后我发送请求:http://192.168.215.165:8081/jobs/c952ba860604a2c32a7abb9eb5b42b0d/stop,然后我得到了响应:

 {
"request-id": "29c559399243c817055ebbaf7431a8d2"
}

然后我发送请求:http://192.168.215.165:8081/jobs/c952ba860604a2c32a7abb9eb5b42b0d/savepoints/29c559399243c817055ebbaf7431a8d2, 我得到了回复:(部分)

{
    "status": {
        "id": "COMPLETED"
    },
    "operation": {
        "failure-cause": {
            "class": "java.util.concurrent.CompletionException",
            "stack-trace": "java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.\n\tat java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)\n\tat java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)\n\tat java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)\n\tat java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc[=12=](AkkaInvocationHandler.java:246)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent. 

请问如何通过 rest API 停止 flink 作业?

几个备选方案:

  • 在您的配置中设置 execution.checkpointing.checkpoints-after-tasks-finish.enabled: true(这是在 1.14 中添加的一些实验性功能,但它应该可以工作)
  • 修改您的作业,以便在作业准备好停止时所有任务仍然运行
  • terminate the job without taking a savepoint