Kubernetes 执行器不会在 Airflow 中并行执行子 DAG

Kubernetes executor do not parallelize sub DAGs execution in Airflow

由于某些执行限制,我们在 Airflow 1.10.0 中放弃了 Celery Executor,现在我们正在使用 KubernetesExecutor

现在我们无法并行化某些 DAG 中的所有任务,即使我们直接更改代码中的 subdag_operatorhttps://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38

我们的期望是,通过这些修改和使用 Kubernetes 执行器,我们可以同时扇出所有任务的执行,但我们具有与 SequentialExecutor.

相同的行为

这是我们现在的行为:

我们想使用 KubernetesExecutor.

同时执行所有这些

Airflow 中的 Kubernetes Executor 会把所有的一级任务变成一个带有 Local Executor 的 worker pod。

就是说你会得到Local Executor来执行你的SubDagOperator

为了在生成 worker pod 后 运行 SubDagOperator 下的任务,您需要为 worker pod 指定配置 parallelism。因此,如果您正在为 worker pod 使用 YAML 格式,则需要将其编辑为类似这样的格式。

apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - args: []
      command: []
      env:
        ###################################
        # This is the part you need to add
        ###################################
        - name: AIRFLOW__CORE__PARALLELISM
          value: 10
        ###################################
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      envFrom: []
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      ports: []
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: false
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
          subPath: repo/tests/dags
  hostNetwork: false
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
  nodeSelector:
    {}
  affinity:
    {}
  tolerations:
    []
  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-local-settings

然后,SubDagOperator将按照parallelism指定的运行任务并行进行。