K8 HA 模式下的 Flink Fencing 错误

Flink Fencing Errors in K8 HA mode

我正在使用 Flink 1.12 并试图让作业管理器保持在 HA over Kubernetes 集群 (AKS) 中。我是 运行 2 位工作经理和 2 位任务经理 pods。

我面临的问题是任务管理器无法找到 jobmanager 领导者。

原因是他们试图访问 jobmanager 的 K8“服务”(这是一个 clusterIP 服务),而不是访问领导者的 pod IP。因此,有时 jobmanager Service 会解析对备用 jobmanager 的注册调用,这使得 TaskManger 无法找到 jobmanager leader。

这是 jobmanager-leader 文件的内容

{
    "apiVersion": "v1",
    "data": {
        "address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2",
        "sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
    },
    "kind": "ConfigMap",
    "metadata": {
        "annotations": {
            "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
        },
        "creationTimestamp": "2021-02-15T16:13:26Z",
        "labels": {
            "app": "flinktestk8cluster",
            "configmap-type": "high-availability",
            "type": "flink-native-kubernetes"
        },
        "name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
        "namespace": "default",
        "resourceVersion": "46202881",
        "selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
        "uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
    }
}

这里flink-jobmanager是jobmanager的K8服务名称

有办法解决这个问题吗?如何让 jobmanager 在 leader 文件中写入 podIP 而不是服务名称?

这里是例外

2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc[=11=](AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Pausing and re-attempting registration in 10000 ms

问题是您希望在使用备用 JobManager 时为您的 JobManager pods 提供唯一地址。因此,您不得配置组件用于相互通信的服务。相反,您应该使用 pod IP 作为其 jobmanager.rpc.address.

启动 JobManager pods

为了使用其 IP 启动每个 JobManager pod,您不能配置包含 Flink 配置的 ConfigMap,因为每个 JobManager pod 的配置都是相同的。相反,您需要将以下代码片段添加到您的 JobManager 部署中:

env:
  - name: MY_POD_IP
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
  - name: FLINK_PROPERTIES
    value: |
      jobmanager.rpc.address: ${MY_POD_IP}

通过这种方式,我们告诉每个 JobManager pod 将 pod 的 IP 用于 jobmanager.rpc.address,这也写入了 K8s HA 服务。如果这样做,那么在 K8s 集群内部运行的每个 K8s HA 服务用户都可以找到当前的领导者。

接下来您需要为所有要使用 K8s HA 服务的部署进行配置。您可以通过扩展 FLINK_PROPERTIES 环境变量来做到这一点:

env:
  - name: MY_POD_IP
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
  - name: FLINK_PROPERTIES
    value: |
      jobmanager.rpc.address: ${MY_POD_IP}
      kubernetes.cluster-id: foobar
      high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
      high-availability.storageDir: hdfs:///flink/recovery
      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 10

将其添加到您的 JobManager pod 定义中并

env:
  - name: FLINK_PROPERTIES
    value: |
      kubernetes.cluster-id: foobar
      high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
      high-availability.storageDir: hdfs:///flink/recovery
      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 10

部署到您的 TaskManager 应该可以解决问题。

可以在此处找到完整的部署 yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1
        args: ["jobmanager"]
        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: FLINK_PROPERTIES
            value: |
              jobmanager.rpc.address: ${MY_POD_IP}
              kubernetes.cluster-id: foobar
              high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
              high-availability.storageDir: file:///flink/recovery
              restart-strategy: fixed-delay
              restart-strategy.fixed-delay.attempts: 10
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1
        args: ["taskmanager"]
        env:
          - name: FLINK_PROPERTIES
            value: "kubernetes.cluster-id: foobar\n
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
high-availability.storageDir: file:///flink/recovery\n
restart-strategy: fixed-delay\n
restart-strategy.fixed-delay.attempts: 10"
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        - containerPort: 6121
          name: metrics
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary