如何让气流一次向芹菜添加数千个任务?

How to get airflow to add thousands of tasks to celery at one time?

我正在评估 Airflow 1.9.0 是否满足我们的分布式编排需求(使用 CeleryExecutor 和 RabbitMQ),我看到了一些奇怪的东西。

我做了一个分三个阶段的 dag:

  1. 开始
  2. 扇出和 运行 N 任务并发
  3. 完成

N 可以很大,最多可达 10K。当第 2 阶段开始时,我希望看到 N 个任务被转储到 Rabbit 队列中。相反,我一次只看到 几百 添加。随着工作人员处理任务并且队列变小,更多的工作人员被添加到 Celery/Rabbit。最终,它确实完成了,但是我真的更希望它立即将所有工作(所有 10K 任务)转储到 Celery 中,原因有两个:

  1. 当前的方式使调度程序长期存在且有状态。调度程序可能会在仅完成 5K 任务后终止,在这种情况下,将永远不会添加剩余的 5K 任务(我验证了这一点)

  2. 我想使用 Rabbit 队列的大小作为指标来触发自动缩放事件以添加更多工作人员。所以我需要一个关于还有多少未完成工作的真实情况(10K,而不是几百)

我假设调度程序有某种节流阀可以防止它同时转储所有 10K 消息?如果是这样,这是可配置的吗?

仅供参考,我已经在 airflow.cfg

中将“并行度”设置为 10K

这是我的测试狗:

# This dag tests how well airflow fans out

from airflow import DAG
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('fan_out', default_args=default_args, schedule_interval=None)

num_tasks = 10000

starting = BashOperator(
    task_id='starting',
    bash_command='echo starting',
    dag=dag
)

all_done = BashOperator(
    task_id='all_done',
    bash_command='echo all done',
    dag=dag)

for i in range(0, num_tasks):
    task = BashOperator(
        task_id='say_hello_' + str(i),
        bash_command='echo hello world',
        dag=dag)
    task.set_upstream(starting)
    task.set_downstream(all_done)

您还需要增加一些其他设置。

[core] 下增加 non_pooled_task_slot_count。这将允许更多任务实际在 celery 中排队。

[celery] 下增加 celeryd_concurrency。这将增加每个工作人员同时尝试从队列中 运行 的任务数。

话虽如此,为了回应你的第一个原因......

尽管如此,如果调度程序未 运行ning,则剩余任务将不会排队,但这是因为 Airflow 调度程序设计为长期存在。当您的工作人员 运行ning 时,它应该始终是 运行ning。如果调度程序因任何原因被终止或死亡,一旦它重新启动,它将从中断的地方继续。

感谢那些建议其他并发设置的人。通过反复试验,我了解到我需要设置所有这三个:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

仅启用这两个,我可以达到 10K,但它非常慢,每 30 秒仅以阶梯方式突发性地添加 100 个新任务:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000

如果我只启用这两个,它是相同的 "stair-step" 模式,每 30 秒添加 128:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

但是如果我设置所有三个,它确实会一次性将 10K 添加到队列中。