Airflow DAG 任务在不同工作节点上的并行性
Airflow DAG tasks parallelism on different worker nodes
我有一个由 3 个工作节点组成的 Airflow 集群,使用 CeleryExecutor 和 RabbitMQ 进行通信。
我的 DAG 通常由下载文件、解压缩文件、将它们上传到 hadoop 等任务组成。因此它们相互依赖,并且必须 运行 单个 machine/node。
当气流调度单个 DAG 的这些任务时,在不同的节点上,我最终会遇到错误,因为这些任务是在不同的机器上调度的,但我需要一个 DAG 中的所有任务安排在一台机器上。
我尝试在 airflow.cfg 和初始化 dag 时设置 dag_concurrency = 1 和 max_active_runs_per_dag = 1,DAG(concurrency = 1, max_active_runs = 1)没有成功。
我的其他airflow.cfg:
parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16
据我所知,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里缺少什么?
CeleryExecutor
支持多个 queues
,您可以为每个操作员定义一个特定的队列(是 BaseOperator
的属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。
来自docs:
Workers can listen to one or multiple queues of tasks. When a worker
is started (using the command airflow celery worker), a set of
comma-delimited queue names can be specified (e.g. airflow celery
worker -q spark). This worker will then only pick up tasks wired to
the specified queue(s)
这是一个 DAG 示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
}
dag = DAG('dist_example',
schedule_interval='@once',
catchup=False,
default_args=default_args
)
get_hostname = 'echo $(hostname)'
t1 = BashOperator(
task_id='task_for_q1',
bash_command=get_hostname,
queue='queue_1',
dag=dag
)
t2 = BashOperator(
task_id='task_for_q2',
bash_command=get_hostname,
queue='queue_2',
dag=dag
)
t1 >> t2
worker_1:
airflow celery worker -q default,queue_1
worker_2:
airflow celery worker -q default,queue_2
通过监听您的特定队列和 default
(由 default_queue
配置键定义),您不会影响任何其他任务的标准多线程行为。
希望对你有用!
我有一个由 3 个工作节点组成的 Airflow 集群,使用 CeleryExecutor 和 RabbitMQ 进行通信。 我的 DAG 通常由下载文件、解压缩文件、将它们上传到 hadoop 等任务组成。因此它们相互依赖,并且必须 运行 单个 machine/node。
当气流调度单个 DAG 的这些任务时,在不同的节点上,我最终会遇到错误,因为这些任务是在不同的机器上调度的,但我需要一个 DAG 中的所有任务安排在一台机器上。
我尝试在 airflow.cfg 和初始化 dag 时设置 dag_concurrency = 1 和 max_active_runs_per_dag = 1,DAG(concurrency = 1, max_active_runs = 1)没有成功。
我的其他airflow.cfg:
parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16
据我所知,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里缺少什么?
CeleryExecutor
支持多个 queues
,您可以为每个操作员定义一个特定的队列(是 BaseOperator
的属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。
来自docs:
Workers can listen to one or multiple queues of tasks. When a worker is started (using the command airflow celery worker), a set of comma-delimited queue names can be specified (e.g. airflow celery worker -q spark). This worker will then only pick up tasks wired to the specified queue(s)
这是一个 DAG 示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
}
dag = DAG('dist_example',
schedule_interval='@once',
catchup=False,
default_args=default_args
)
get_hostname = 'echo $(hostname)'
t1 = BashOperator(
task_id='task_for_q1',
bash_command=get_hostname,
queue='queue_1',
dag=dag
)
t2 = BashOperator(
task_id='task_for_q2',
bash_command=get_hostname,
queue='queue_2',
dag=dag
)
t1 >> t2
worker_1:
airflow celery worker -q default,queue_1
worker_2:
airflow celery worker -q default,queue_2
通过监听您的特定队列和 default
(由 default_queue
配置键定义),您不会影响任何其他任务的标准多线程行为。
希望对你有用!