无法使用 Flink CLI 将流部署到 Apache Flink 的 HA 集群
Can't deploy flow to HA cluster of Apache Flink using Flink CLI
我可以毫无问题地将流程部署到 Apache Flink 的独立安装(使用一个 JobManager 和多个 TaskManager):
bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>
但是当我 运行 相同的命令并部署到独立 HA 集群时,此命令引发错误:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.Client.runDetached(Client.java:406)
at org.apache.flink.client.program.Client.runDetached(Client.java:366)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
at org.apache.flink.client.program.Client.runDetached(Client.java:278)
at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221)
at org.apache.flink.client.program.Client.runDetached(Client.java:403)
... 7 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218)
... 8 more
活动作业管理器将以下错误写入日志:
2016-04-14 13:54:44,160 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-04-14 13:54:46,299 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None.
所以,我不明白什么会导致这样的错误?
如果需要更多信息,请告诉我。
P.S.
从 Flink Dashboard 部署适用于独立 HA 集群。我只通过Flink CLI部署就出现了这个问题
更新
清空Zookeeper,清空磁盘上Flink使用的目录,重新部署Flink Standalone HA集群。然后我尝试 运行 流使用 bin/flink run
命令。如您所见,JobManager 只写了一行有关问题的信息(请参阅 flink--jobmanager-0-example-app-1.stag.local.log)。
所有 JobManager 和 TaskManager 使用相同的 flink-conf.yaml
:
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example/flink
recovery.zookeeper.storageDir: s3a://example-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
所以,似乎独立 HA 集群配置正确。
更新 2
仅供参考:我想按照 here 所述安装独立 HA 集群。不是 YARN HA 集群。
更新 3
这是 bin/flink
CLI 创建的日志:flink-username-client-hostname.local.log。
当以HA模式启动Flink集群时,JobManager
地址及其leader id被写入指定的ZooKeeper集群。为了与 JobManager
通信,你不仅要知道地址,还要知道它的领导者地址。因此,您必须在 CLI 读取的“flink-conf.yaml”中指定以下参数。
recovery.mode: zookeeper
recovery.zookeeper.quorum: address of your cluster
recovery.zookeeper.path.root: ZK path you've started your cluster with
有了这些信息,客户端知道在哪里可以找到 ZooKeeper 集群以及在哪里可以找到 JobManager
地址及其领导者 ID。
我可以毫无问题地将流程部署到 Apache Flink 的独立安装(使用一个 JobManager 和多个 TaskManager):
bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>
但是当我 运行 相同的命令并部署到独立 HA 集群时,此命令引发错误:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.Client.runDetached(Client.java:406)
at org.apache.flink.client.program.Client.runDetached(Client.java:366)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
at org.apache.flink.client.program.Client.runDetached(Client.java:278)
at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221)
at org.apache.flink.client.program.Client.runDetached(Client.java:403)
... 7 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218)
... 8 more
活动作业管理器将以下错误写入日志:
2016-04-14 13:54:44,160 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-04-14 13:54:46,299 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None.
所以,我不明白什么会导致这样的错误?
如果需要更多信息,请告诉我。
P.S.
从 Flink Dashboard 部署适用于独立 HA 集群。我只通过Flink CLI部署就出现了这个问题
更新
清空Zookeeper,清空磁盘上Flink使用的目录,重新部署Flink Standalone HA集群。然后我尝试 运行 流使用 bin/flink run
命令。如您所见,JobManager 只写了一行有关问题的信息(请参阅 flink--jobmanager-0-example-app-1.stag.local.log)。
所有 JobManager 和 TaskManager 使用相同的 flink-conf.yaml
:
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example/flink
recovery.zookeeper.storageDir: s3a://example-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
所以,似乎独立 HA 集群配置正确。
更新 2
仅供参考:我想按照 here 所述安装独立 HA 集群。不是 YARN HA 集群。
更新 3
这是 bin/flink
CLI 创建的日志:flink-username-client-hostname.local.log。
当以HA模式启动Flink集群时,JobManager
地址及其leader id被写入指定的ZooKeeper集群。为了与 JobManager
通信,你不仅要知道地址,还要知道它的领导者地址。因此,您必须在 CLI 读取的“flink-conf.yaml”中指定以下参数。
recovery.mode: zookeeper
recovery.zookeeper.quorum: address of your cluster
recovery.zookeeper.path.root: ZK path you've started your cluster with
有了这些信息,客户端知道在哪里可以找到 ZooKeeper 集群以及在哪里可以找到 JobManager
地址及其领导者 ID。