可以使用 spark 配置来配置 Beam 便携式跑步机吗?

It's possible to configure the Beam portable runner with the spark configurations?

TLDR;

可以使用 spark 配置来配置 Beam 便携式跑步机吗?更准确地说,可以在 Portable Runner?

中配置 spark.driver.host

动机

目前,我们在Kubernetes集群中实现了airflow,为了使用TensorFlow Extended我们需要使用Apache beam。对于我们的用例,Spark 将是合适的运行器,并且由于气流和 TensorFlow 在 python 中编码,我们需要使用 Apache Beam 的便携式运行器 (https://beam.apache.org/documentation/runners/spark/#portability)。

问题

便携式运行器在其容器内创建 spark 上下文,并且不会为驱动程序 DNS 配置保留 space,从而使工作程序中的执行程序 pods 无法与驱动程序(作业服务器)通信).

设置

  1. 根据 beam 文档,job serer 是在与 airflow 相同的 pod 中实现的,以使用这两个容器之间的本地网络。 作业服务器配置:
- name: beam-spark-job-server
  image: apache/beam_spark_job_server:2.27.0
  args: ["--spark-master-url=spark://spark-master:7077"]

工作server/airflow服务:

apiVersion: v1
kind: Service
metadata:
  name: airflow-scheduler
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: airflow-scheduler
  ports:
    - port: 8793
      protocol: TCP
      targetPort: 8793
      name: scheduler
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

8097、8098、8099端口与job server相关,8793与airflow相关,7077与spark master相关。

Development/Errors

  1. 当从气流容器测试一个简单的光束示例 python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK 时,我在气流吊舱上得到以下响应:
Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

和工人日志:

21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.

正如我们所见,执行程序不断退出,据我所知,这个问题是由执行程序和驱动程序(在本例中为作业服务器)之间缺少通信造成的。此外,“--driver-url”使用随机端口“-Dspark.driver.port”转换为驱动程序 pod 名称。 由于我们无法定义服务的名称,工作人员尝试使用驱动程序的原始名称并使用随机生成的端口。由于配置来自驱动程序,更改 worker/master 中的默认配置文件不会产生任何结果。 以 this answer 为例,我尝试在作业服务器中使用环境变量 SPARK_PUBLIC_DNS 但这并没有导致工作日志发生任何变化。

观察

直接在kubernetes中使用spark job kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client 有服务:

apiVersion: v1
kind: Service
metadata:
  name: spark-client
spec:
  selector:
    app: spark-client
  clusterIP: None

我得到了一个完整的 pyspark shell。如果我省略 --conf 参数,我会得到与第一个设置相同的行为(无限期退出执行程序)

21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"

根据您的部署要求,我提供了三种解决方案供您选择。按难度排序:

  1. 使用 Spark“uber jar”作业服务器。这会在 Spark master 中启动一个嵌入式作业服务器,而不是在容器中使用独立的作业服务器。这将大大简化您的部署,因为您根本不需要启动 beam_spark_job_server 容器。
python -m apache_beam.examples.wordcount \
--output ./data_test/ \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--environment_type=LOOPBACK
  1. 您可以通过 Spark 配置文件传递属性。创建 Spark 配置文件,并添加 spark.driver.host 和您需要的任何其他属性。在作业服务器的 docker run 命令中,将该配置文件挂载到容器,并将 SPARK_CONF_DIR 环境变量设置为指向该目录。

  2. 如果这些都不适合您,您也可以构建自己的自定义版本的作业服务器容器。从 Github 中拉出光束源。查看您要使用的发布分支(例如 git checkout origin/release-2.28.0)。修改入口点 spark-job-server.sh 并在那里设置 -Dspark.driver.host=x。然后使用 ./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag".

    构建容器

让我修改一下答案。 Job 服务器需要能够与 workers 进行通信,反之亦然。 keep exiting的错误就是因为这个。您需要配置以便它们可以通信。能够解决这个问题的 k8s 无头服务。

参考 https://github.com/cometta/python-apache-beam-spark 中的可行示例。如果对你有用,可以帮我'Star'资源库