Airflow DAG 任务在不同工作节点上的并行性

Airflow DAG tasks parallelism on different worker nodes

我有一个由 3 个工作节点组成的 Airflow 集群,使用 CeleryExecutor 和 RabbitMQ 进行通信。 我的 DAG 通常由下载文件、解压缩文件、将它们上传到 hadoop 等任务组成。因此它们相互依赖,并且必须 运行 单个 machine/node。

当气流调度单个 DAG 的这些任务时,在不同的节点上,我最终会遇到错误,因为这些任务是在不同的机器上调度的,但我需要一个 DAG 中的所有任务安排在一台机器上。

我尝试在 airflow.cfg 和初始化 dag 时设置 dag_concurrency = 1 和 max_active_runs_per_dag = 1,DAG(concurrency = 1, max_active_runs = 1)没有成功。

我的其他airflow.cfg:

parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16

据我所知,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里缺少什么?

CeleryExecutor 支持多个 queues,您可以为每个操作员定义一个特定的队列(是 BaseOperator 的属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。

来自docs

Workers can listen to one or multiple queues of tasks. When a worker is started (using the command airflow celery worker), a set of comma-delimited queue names can be specified (e.g. airflow celery worker -q spark). This worker will then only pick up tasks wired to the specified queue(s)

这是一个 DAG 示例:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
}
dag = DAG('dist_example',
          schedule_interval='@once',
          catchup=False,
          default_args=default_args
          )
get_hostname = 'echo $(hostname)'

t1 = BashOperator(
    task_id='task_for_q1',
    bash_command=get_hostname,
    queue='queue_1',
    dag=dag
)
t2 = BashOperator(
    task_id='task_for_q2',
    bash_command=get_hostname,
    queue='queue_2',
    dag=dag
)
t1 >> t2

worker_1: airflow celery worker -q default,queue_1

worker_2: airflow celery worker -q default,queue_2

通过监听您的特定队列和 default(由 default_queue 配置键定义),您不会影响任何其他任务的标准多线程行为。

希望对你有用!