TFX/Apache 当 运行 在多个任务管理器上时,Beam -> Flink 作业挂起

TFX/Apache Beam -> Flink jobs hang when running on more than one task manager

当我尝试在 Flink 运行ner 上 运行 TFX pipeline/Apache Beam 作业时,使用并行度为 2 的 1 个任务管理器(在一个节点上)时它工作正常(每个任务管理器 2 个任务槽)。但是当我在多个任务管理器上以更高的并行度尝试它时挂起,消息在两个任务管理器上不断重复:

INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory [] - Still waiting for startup of environment from a65a0c5f8f962428897aac40763e57b0-1334930809.eu-central-1.elb.amazonaws.com:50000 for worker id 1-1

Flink 集群 运行 在 AWS EKS Kubernetes 集群上的本地 Kubernetes 部署上。

我使用以下参数:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        f"--environment_config={beam_sdk_url}:50000",
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",
    

编辑:添加有关配置的附加信息

我已经通过设置 Flink 参数将 Beam worker 配置为 运行 作为 side-cars(至少这是我对它应该如何工作的理解):

kubernetes.pod-template-file.taskmanager

它指向一个包含以下内容的模板文件:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
     #hostNetwork: true
     containers:
      - name: flink-main-container
        #image: apache/flink:scala_2.12
        env:
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
          - name: PYTHONPATH
            value: "/data/flink/src"
        args: ["taskmanager"]
        ports:
        - containerPort: 6122 #22
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122 #22
          initialDelaySeconds: 30
          periodSeconds: 60
      - name: beam-worker-pool
        env:
          - name: PYTHONPATH
            value: "/data/flink/src"
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
        image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
        imagePullPolicy: Always
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60

我还为任务管理器创建了一个 kubernetes 负载平衡器,因此客户端可以连接到端口 50000。所以我在配置时使用该地址:

f"--environment_config={beam_sdk_url}:50000",

编辑 2:看起来一个任务管理器上的 Beam SDK 线束想要连接到另一个任务管理器上的端点 运行ning,但在本地主机上寻找它:

从 TM 2 上的 beam-worker-pool 登录:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705
    caused by:
context deadline exceeded

TM 1 上的配置端点是实际监听端口 33705 的端点,而它正在本地主机上寻找它,因此无法连接到它。

编辑 3:显示我如何测试它:

...............

TM 1:
========
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983
    caused by:
context deadline exceeded

TM 2:
=========
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907
    caused by:
context deadline exceeded

Testing:
.........................

TM 1:
============
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

不确定如何解决这个问题。

谢谢, 戈尔扬

不建议尝试连接到具有不同任务管理器的相同环境。通常我们建议将 Beam worker 设置为任务管理器的边车,以便有 1:1 通信,然后通过本地主机连接。请参阅 https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml and https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_wordcount_py.yaml

处的示例配置

我能够通过将 Beam SDK 地址设置为 localhost 而不是使用负载平衡器来解决这个问题。所以我现在使用的配置是:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=localhost:50000", # <--- Changed the address to localhost
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",