K8 HA 模式下的 Flink Fencing 错误
Flink Fencing Errors in K8 HA mode
我正在使用 Flink 1.12 并试图让作业管理器保持在 HA over Kubernetes 集群 (AKS) 中。我是 运行 2 位工作经理和 2 位任务经理 pods。
我面临的问题是任务管理器无法找到 jobmanager 领导者。
原因是他们试图访问 jobmanager 的 K8“服务”(这是一个 clusterIP 服务),而不是访问领导者的 pod IP。因此,有时 jobmanager Service 会解析对备用 jobmanager 的注册调用,这使得 TaskManger 无法找到 jobmanager leader。
这是 jobmanager-leader 文件的内容
{
"apiVersion": "v1",
"data": {
"address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2",
"sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
},
"kind": "ConfigMap",
"metadata": {
"annotations": {
"control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
},
"creationTimestamp": "2021-02-15T16:13:26Z",
"labels": {
"app": "flinktestk8cluster",
"configmap-type": "high-availability",
"type": "flink-native-kubernetes"
},
"name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
"namespace": "default",
"resourceVersion": "46202881",
"selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
"uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
}
}
这里flink-jobmanager
是jobmanager的K8服务名称
有办法解决这个问题吗?如何让 jobmanager 在 leader 文件中写入 podIP 而不是服务名称?
这里是例外
2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc[=11=](AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Pausing and re-attempting registration in 10000 ms
问题是您希望在使用备用 JobManager 时为您的 JobManager pods 提供唯一地址。因此,您不得配置组件用于相互通信的服务。相反,您应该使用 pod IP 作为其 jobmanager.rpc.address
.
启动 JobManager pods
为了使用其 IP 启动每个 JobManager pod,您不能配置包含 Flink 配置的 ConfigMap,因为每个 JobManager pod 的配置都是相同的。相反,您需要将以下代码片段添加到您的 JobManager 部署中:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
通过这种方式,我们告诉每个 JobManager pod 将 pod 的 IP 用于 jobmanager.rpc.address
,这也写入了 K8s HA 服务。如果这样做,那么在 K8s 集群内部运行的每个 K8s HA 服务用户都可以找到当前的领导者。
接下来您需要为所有要使用 K8s HA 服务的部署进行配置。您可以通过扩展 FLINK_PROPERTIES
环境变量来做到这一点:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
将其添加到您的 JobManager pod 定义中并
env:
- name: FLINK_PROPERTIES
value: |
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
部署到您的 TaskManager 应该可以解决问题。
可以在此处找到完整的部署 yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.12.1
args: ["jobmanager"]
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.12.1
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: "kubernetes.cluster-id: foobar\n
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
high-availability.storageDir: file:///flink/recovery\n
restart-strategy: fixed-delay\n
restart-strategy.fixed-delay.attempts: 10"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 6121
name: metrics
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
我正在使用 Flink 1.12 并试图让作业管理器保持在 HA over Kubernetes 集群 (AKS) 中。我是 运行 2 位工作经理和 2 位任务经理 pods。
我面临的问题是任务管理器无法找到 jobmanager 领导者。
原因是他们试图访问 jobmanager 的 K8“服务”(这是一个 clusterIP 服务),而不是访问领导者的 pod IP。因此,有时 jobmanager Service 会解析对备用 jobmanager 的注册调用,这使得 TaskManger 无法找到 jobmanager leader。
这是 jobmanager-leader 文件的内容
{
"apiVersion": "v1",
"data": {
"address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2",
"sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
},
"kind": "ConfigMap",
"metadata": {
"annotations": {
"control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
},
"creationTimestamp": "2021-02-15T16:13:26Z",
"labels": {
"app": "flinktestk8cluster",
"configmap-type": "high-availability",
"type": "flink-native-kubernetes"
},
"name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
"namespace": "default",
"resourceVersion": "46202881",
"selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
"uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
}
}
这里flink-jobmanager
是jobmanager的K8服务名称
有办法解决这个问题吗?如何让 jobmanager 在 leader 文件中写入 podIP 而不是服务名称?
这里是例外
2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc[=11=](AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Pausing and re-attempting registration in 10000 ms
问题是您希望在使用备用 JobManager 时为您的 JobManager pods 提供唯一地址。因此,您不得配置组件用于相互通信的服务。相反,您应该使用 pod IP 作为其 jobmanager.rpc.address
.
为了使用其 IP 启动每个 JobManager pod,您不能配置包含 Flink 配置的 ConfigMap,因为每个 JobManager pod 的配置都是相同的。相反,您需要将以下代码片段添加到您的 JobManager 部署中:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
通过这种方式,我们告诉每个 JobManager pod 将 pod 的 IP 用于 jobmanager.rpc.address
,这也写入了 K8s HA 服务。如果这样做,那么在 K8s 集群内部运行的每个 K8s HA 服务用户都可以找到当前的领导者。
接下来您需要为所有要使用 K8s HA 服务的部署进行配置。您可以通过扩展 FLINK_PROPERTIES
环境变量来做到这一点:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
将其添加到您的 JobManager pod 定义中并
env:
- name: FLINK_PROPERTIES
value: |
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
部署到您的 TaskManager 应该可以解决问题。
可以在此处找到完整的部署 yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.12.1
args: ["jobmanager"]
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.12.1
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: "kubernetes.cluster-id: foobar\n
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
high-availability.storageDir: file:///flink/recovery\n
restart-strategy: fixed-delay\n
restart-strategy.fixed-delay.attempts: 10"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 6121
name: metrics
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary