Kubernetes 执行器不会在 Airflow 中并行执行子 DAG
Kubernetes executor do not parallelize sub DAGs execution in Airflow
由于某些执行限制,我们在 Airflow 1.10.0 中放弃了 Celery Executor,现在我们正在使用 KubernetesExecutor
。
现在我们无法并行化某些 DAG 中的所有任务,即使我们直接更改代码中的 subdag_operator
:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38
我们的期望是,通过这些修改和使用 Kubernetes 执行器,我们可以同时扇出所有任务的执行,但我们具有与 SequentialExecutor
.
相同的行为
这是我们现在的行为:
我们想使用 KubernetesExecutor
.
同时执行所有这些
Airflow 中的 Kubernetes Executor 会把所有的一级任务变成一个带有 Local Executor 的 worker pod。
就是说你会得到Local Executor来执行你的SubDagOperator
。
为了在生成 worker pod 后 运行 SubDagOperator 下的任务,您需要为 worker pod 指定配置 parallelism
。因此,如果您正在为 worker pod 使用 YAML 格式,则需要将其编辑为类似这样的格式。
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: []
command: []
env:
###################################
# This is the part you need to add
###################################
- name: AIRFLOW__CORE__PARALLELISM
value: 10
###################################
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
envFrom: []
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: false
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
subPath: repo/tests/dags
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector:
{}
affinity:
{}
tolerations:
[]
serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
volumes:
- name: dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-local-settings
然后,SubDagOperator
将按照parallelism
指定的运行任务并行进行。
由于某些执行限制,我们在 Airflow 1.10.0 中放弃了 Celery Executor,现在我们正在使用 KubernetesExecutor
。
现在我们无法并行化某些 DAG 中的所有任务,即使我们直接更改代码中的 subdag_operator
:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38
我们的期望是,通过这些修改和使用 Kubernetes 执行器,我们可以同时扇出所有任务的执行,但我们具有与 SequentialExecutor
.
这是我们现在的行为:
我们想使用 KubernetesExecutor
.
Airflow 中的 Kubernetes Executor 会把所有的一级任务变成一个带有 Local Executor 的 worker pod。
就是说你会得到Local Executor来执行你的SubDagOperator
。
为了在生成 worker pod 后 运行 SubDagOperator 下的任务,您需要为 worker pod 指定配置 parallelism
。因此,如果您正在为 worker pod 使用 YAML 格式,则需要将其编辑为类似这样的格式。
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: []
command: []
env:
###################################
# This is the part you need to add
###################################
- name: AIRFLOW__CORE__PARALLELISM
value: 10
###################################
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
envFrom: []
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: false
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
subPath: repo/tests/dags
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector:
{}
affinity:
{}
tolerations:
[]
serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
volumes:
- name: dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-local-settings
然后,SubDagOperator
将按照parallelism
指定的运行任务并行进行。