Airflow:在不同的 Docker 容器中触发 Spark
Airflow: trigger Spark in different Docker container
我在 docker-compose 管道中同时拥有 Airflow 2(官方图片)和 Apache Spark 运行。
我想通过SparkSubmitOperator
(https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/spark_submit_operator/index.html)执行一个触发Spark脚本的DAG,但是我失败了;在 Airflow Web 服务器中,我可以看到以下日志:
*** Reading local file: /opt/airflow/logs/timetable/spark-job/2021-05-16T07:18:57.288610+00:00/1.log
[2021-05-16 07:18:58,856] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,906] {taskinstance.py:1069} INFO - Starting attempt 1 of 4
[2021-05-16 07:18:58,906] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,926] {taskinstance.py:1089} INFO - Executing <Task(SparkSubmitOperator): spark-job> on 2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:58,941] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'timetable', 'spark-job', '2021-05-16T07:18:57.288610+00:00', '--job-id', '164', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmp0opwomfp', '--error-file', '/tmp/tmpl4ctddqc']
[2021-05-16 07:18:58,935] {standard_task_runner.py:52} INFO - Started process 69 to run task
[2021-05-16 07:18:58,941] {standard_task_runner.py:77} INFO - Job 164: Subtask spark-job
[2021-05-16 07:18:59,000] {logging_mixin.py:104} INFO - Running <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [running]> on host 94b160a4f0d4
[2021-05-16 07:18:59,053] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=timetable
AIRFLOW_CTX_TASK_ID=spark-job
AIRFLOW_CTX_EXECUTION_DATE=2021-05-16T07:18:57.288610+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:59,055] {base.py:78} INFO - Using connection to: id: spark_default. Host: spark, Port: 8080, Schema: , Login: None, Password: None, extra: None
[2021-05-16 07:18:59,057] {spark_submit.py:364} INFO - Spark-Submit cmd: spark-submit --master spark:8080 --name arrow-spark spark-app.py
[2021-05-16 07:18:59,145] {spark_submit.py:526} INFO - JAVA_HOME is not set
[2021-05-16 07:18:59,156] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 183, in execute
self._hook.submit(self._application)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 455, in submit
self._mask_cmd(spark_submit_cmd), returncode
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1.
[2021-05-16 07:18:59,159] {taskinstance.py:1532} INFO - Marking task as UP_FOR_RETRY. dag_id=timetable, task_id=spark-job, execution_date=20210516T071857, start_date=20210516T071858, end_date=20210516T071859
[2021-05-16 07:18:59,196] {local_task_job.py:146} INFO - Task exited with return code 1
由于airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1.
行不是很准确,我不知道如何继续。似乎与 Spark 的连接未正确初始化。因此我的问题是:
当 Spark 位于不同的 Docker 容器中时,如何使用 SparkSubmitOperator 触发 DAG?
我的设置:
daniel@Yoga:~/Projekte/db/airflow$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5e50523ee1ad apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes 8080/tcp airflowWorker
1da21c7545b3 apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:8081->8080/tcp, :::8081->8080/tcp airflowWebserver
0fa61a4d0ce0 apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 8080/tcp airflowFlower
8a09bafae90a apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes 8080/tcp airflowScheduler
bbe5eb2111a7 postgres:13 "docker-entrypoint.s…" 23 minutes ago Up 23 minutes (healthy) 5432/tcp airflowPostgres
6b03d5411e52 redis:latest "docker-entrypoint.s…" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:6380->6379/tcp, :::6380->6379/tcp airflowRedis
23457a819731 bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes spark_worker2
b2df035e216e bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp spark
452d0cec2a0c bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes spark_worker1
所有容器都在同一个 Docker 网络中,因此应该能够轻松通信。
我的DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'dw',
'start_date': datetime(2021, 5, 9),
"retries": 3,
"retry_delay": timedelta(minutes = 1)
}
dag = DAG('timetable', description = 'spark test', catchup = False, schedule_interval = "@hourly", default_args = default_args)
s1 = SparkSubmitOperator(
task_id = "spark-job",
application = "spark-app.py",
conn_id = "spark_default",
dag = dag
)
可以看出,conn_id
设置为spark_default
。此连接在 Admin/Connections 下的 Airflow 服务器中设置。我设置如下:
我想通过 Spark 执行的脚本非常简单(虚拟脚本)并且位于定义 DAG 的同一文件夹中:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
rdd = sc.parallelize(range(10))
rdd.count()
几个类似的线程处理了这个问题,但我没有找到合适的解决方案:
- Submit a spark job from Airflow to external spark container
- Apache Spark and Apache Airflow connection in Docker based solution
我终于设法 运行 通过在 AirflowWorker 容器上安装 Java,按照@floating_hammer 的建议。
详情见How to install java in an airflow container using docker-compose.yaml。
我在 docker-compose 管道中同时拥有 Airflow 2(官方图片)和 Apache Spark 运行。
我想通过SparkSubmitOperator
(https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/spark_submit_operator/index.html)执行一个触发Spark脚本的DAG,但是我失败了;在 Airflow Web 服务器中,我可以看到以下日志:
*** Reading local file: /opt/airflow/logs/timetable/spark-job/2021-05-16T07:18:57.288610+00:00/1.log
[2021-05-16 07:18:58,856] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,906] {taskinstance.py:1069} INFO - Starting attempt 1 of 4
[2021-05-16 07:18:58,906] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,926] {taskinstance.py:1089} INFO - Executing <Task(SparkSubmitOperator): spark-job> on 2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:58,941] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'timetable', 'spark-job', '2021-05-16T07:18:57.288610+00:00', '--job-id', '164', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmp0opwomfp', '--error-file', '/tmp/tmpl4ctddqc']
[2021-05-16 07:18:58,935] {standard_task_runner.py:52} INFO - Started process 69 to run task
[2021-05-16 07:18:58,941] {standard_task_runner.py:77} INFO - Job 164: Subtask spark-job
[2021-05-16 07:18:59,000] {logging_mixin.py:104} INFO - Running <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [running]> on host 94b160a4f0d4
[2021-05-16 07:18:59,053] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=timetable
AIRFLOW_CTX_TASK_ID=spark-job
AIRFLOW_CTX_EXECUTION_DATE=2021-05-16T07:18:57.288610+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:59,055] {base.py:78} INFO - Using connection to: id: spark_default. Host: spark, Port: 8080, Schema: , Login: None, Password: None, extra: None
[2021-05-16 07:18:59,057] {spark_submit.py:364} INFO - Spark-Submit cmd: spark-submit --master spark:8080 --name arrow-spark spark-app.py
[2021-05-16 07:18:59,145] {spark_submit.py:526} INFO - JAVA_HOME is not set
[2021-05-16 07:18:59,156] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 183, in execute
self._hook.submit(self._application)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 455, in submit
self._mask_cmd(spark_submit_cmd), returncode
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1.
[2021-05-16 07:18:59,159] {taskinstance.py:1532} INFO - Marking task as UP_FOR_RETRY. dag_id=timetable, task_id=spark-job, execution_date=20210516T071857, start_date=20210516T071858, end_date=20210516T071859
[2021-05-16 07:18:59,196] {local_task_job.py:146} INFO - Task exited with return code 1
由于airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1.
行不是很准确,我不知道如何继续。似乎与 Spark 的连接未正确初始化。因此我的问题是:
当 Spark 位于不同的 Docker 容器中时,如何使用 SparkSubmitOperator 触发 DAG?
我的设置:
daniel@Yoga:~/Projekte/db/airflow$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5e50523ee1ad apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes 8080/tcp airflowWorker
1da21c7545b3 apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:8081->8080/tcp, :::8081->8080/tcp airflowWebserver
0fa61a4d0ce0 apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 8080/tcp airflowFlower
8a09bafae90a apache/airflow:2.0.2 "/usr/bin/dumb-init …" 23 minutes ago Up 23 minutes 8080/tcp airflowScheduler
bbe5eb2111a7 postgres:13 "docker-entrypoint.s…" 23 minutes ago Up 23 minutes (healthy) 5432/tcp airflowPostgres
6b03d5411e52 redis:latest "docker-entrypoint.s…" 23 minutes ago Up 23 minutes (healthy) 0.0.0.0:6380->6379/tcp, :::6380->6379/tcp airflowRedis
23457a819731 bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes spark_worker2
b2df035e216e bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp spark
452d0cec2a0c bitnami/spark:3 "/opt/bitnami/script…" 48 minutes ago Up 48 minutes spark_worker1
所有容器都在同一个 Docker 网络中,因此应该能够轻松通信。
我的DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'dw',
'start_date': datetime(2021, 5, 9),
"retries": 3,
"retry_delay": timedelta(minutes = 1)
}
dag = DAG('timetable', description = 'spark test', catchup = False, schedule_interval = "@hourly", default_args = default_args)
s1 = SparkSubmitOperator(
task_id = "spark-job",
application = "spark-app.py",
conn_id = "spark_default",
dag = dag
)
可以看出,conn_id
设置为spark_default
。此连接在 Admin/Connections 下的 Airflow 服务器中设置。我设置如下:
我想通过 Spark 执行的脚本非常简单(虚拟脚本)并且位于定义 DAG 的同一文件夹中:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
rdd = sc.parallelize(range(10))
rdd.count()
几个类似的线程处理了这个问题,但我没有找到合适的解决方案:
- Submit a spark job from Airflow to external spark container
- Apache Spark and Apache Airflow connection in Docker based solution
我终于设法 运行 通过在 AirflowWorker 容器上安装 Java,按照@floating_hammer 的建议。
详情见How to install java in an airflow container using docker-compose.yaml。