Flink StateFun 高可用异常:"java.lang.IllegalStateException: There is no operator for the state ....."

Flink StateFun high availability exception: "java.lang.IllegalStateException: There is no operator for the state ....."

我有 2 个与 StateFun 应用程序 运行 在 Kubernetes 上的高可用性相关的问题

以下是有关我的设置的详细信息:

1- 我尝试了 Zookeeper 和 Kubernetes HA 设置,结果是一样的(下面的日志来自 Zookeeper HA env)。当我杀死 jobmanager pod 时,minikube 启动了另一个 pod,这个新 pod 在尝试加载最后一个检查点时失败了:

...
2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils                 [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO  org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO  org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.
    at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new[=11=](DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
    at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService[=11=](DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2021-12-11 14:25:27,036 INFO  org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO  org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2021-12-11 14:25:27,040 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
2021-12-11 14:25:27,041 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}

我认为在使用 StateFun 时无法为 Flink 运算符指定 ID(如 here 所述)是导致此问题的原因。虽然一开始运行良好,但操作员分配了一些随机 ID,检查点也运行良好。重启后,operators 被分配了其他随机 id,当 jobmanager(在本例中为 statefun master)尝试加载状态“2edd7b5dafb2c271440b25f6da5f4532”时,它找不到最初分配给它的 operator。

有人可以确认我的想法是正确的和/或给我指导,让我的 StateFun 应用程序以高可用性工作吗?

要注意的另一件有趣的事情是,在重新启动Jobmanager POD之后,有时可以从检查点...从检查点... state to restore”日志(link)——这让我不确定它是否真的恢复了,或者它只是开始丢弃最后一个成功检查点的状态。这可能是什么原因造成的?真的从checkpoint恢复成功了吗?

2- 对于 Kubernetes 部署,在 StateFun 部署文档 (link) Deployment type is used for jobmanager component. On the other hand Flink deployment documentation (Standalone / Kubernetes section) (link) 上使用 Jobmanager 的作业类型来实现高可用设置 (jobmanager-application-ha.yaml 文件)

基本上,由于 Kubernetes 会在失败时重新启动 pod,因此可以使用 Job 或 Deployment。但问题是,当我们尝试使用保存点停止作业并使用部署类型时,Kubernetes 会重新启动 pod,而不管保存点创建成功和成功退出状态 (0)。

我们是否应该在 Kubernetes 上 运行 时不停止带有保存点的 StateFun 应用程序?我知道相关错误 (link) - but although it seems to be deprecated I can do a cancel with savepoint - are we supposed to just delete deployment as told in High availability data clean up section? (link)

第一个问题的更新:我打开了调试日志记录,并且可以连续捕获异常和成功启动的会话。以下为未成功者:

...
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: } 

这是成功的:

2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '2edd7b5dafb2c271440b25f6da5f4532' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: }

似乎两次运行之间生成的哈希值的计算方式不同。

在 statefun <= 3.2 中,路由器没有手动指定的 UID。虽然 Flinks 内部 UID 生成是确定性的,但 statefun 生成底层流图的方式在某些情况下可能并非如此。这是一个错误。我已经打开一个 PR 以向后兼容的方式解决这个问题[1]。

[1] https://github.com/apache/flink-statefun/pull/279