如何让气流一次向芹菜添加数千个任务?
How to get airflow to add thousands of tasks to celery at one time?
我正在评估 Airflow 1.9.0 是否满足我们的分布式编排需求(使用 CeleryExecutor 和 RabbitMQ),我看到了一些奇怪的东西。
我做了一个分三个阶段的 dag:
- 开始
- 扇出和 运行 N 任务并发
- 完成
N 可以很大,最多可达 10K。当第 2 阶段开始时,我希望看到 N 个任务被转储到 Rabbit 队列中。相反,我一次只看到 几百 添加。随着工作人员处理任务并且队列变小,更多的工作人员被添加到 Celery/Rabbit。最终,它确实完成了,但是我真的更希望它立即将所有工作(所有 10K 任务)转储到 Celery 中,原因有两个:
当前的方式使调度程序长期存在且有状态。调度程序可能会在仅完成 5K 任务后终止,在这种情况下,将永远不会添加剩余的 5K 任务(我验证了这一点)
我想使用 Rabbit 队列的大小作为指标来触发自动缩放事件以添加更多工作人员。所以我需要一个关于还有多少未完成工作的真实情况(10K,而不是几百)
我假设调度程序有某种节流阀可以防止它同时转储所有 10K 消息?如果是这样,这是可配置的吗?
仅供参考,我已经在 airflow.cfg
中将“并行度”设置为 10K
这是我的测试狗:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
您还需要增加一些其他设置。
在 [core]
下增加 non_pooled_task_slot_count
。这将允许更多任务实际在 celery 中排队。
在 [celery
] 下增加 celeryd_concurrency
。这将增加每个工作人员同时尝试从队列中 运行 的任务数。
话虽如此,为了回应你的第一个原因......
尽管如此,如果调度程序未 运行ning,则剩余任务将不会排队,但这是因为 Airflow 调度程序设计为长期存在。当您的工作人员 运行ning 时,它应该始终是 运行ning。如果调度程序因任何原因被终止或死亡,一旦它重新启动,它将从中断的地方继续。
感谢那些建议其他并发设置的人。通过反复试验,我了解到我需要设置所有这三个:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
仅启用这两个,我可以达到 10K,但它非常慢,每 30 秒仅以阶梯方式突发性地添加 100 个新任务:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
如果我只启用这两个,它是相同的 "stair-step" 模式,每 30 秒添加 128:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
但是如果我设置所有三个,它确实会一次性将 10K 添加到队列中。
我正在评估 Airflow 1.9.0 是否满足我们的分布式编排需求(使用 CeleryExecutor 和 RabbitMQ),我看到了一些奇怪的东西。
我做了一个分三个阶段的 dag:
- 开始
- 扇出和 运行 N 任务并发
- 完成
N 可以很大,最多可达 10K。当第 2 阶段开始时,我希望看到 N 个任务被转储到 Rabbit 队列中。相反,我一次只看到 几百 添加。随着工作人员处理任务并且队列变小,更多的工作人员被添加到 Celery/Rabbit。最终,它确实完成了,但是我真的更希望它立即将所有工作(所有 10K 任务)转储到 Celery 中,原因有两个:
当前的方式使调度程序长期存在且有状态。调度程序可能会在仅完成 5K 任务后终止,在这种情况下,将永远不会添加剩余的 5K 任务(我验证了这一点)
我想使用 Rabbit 队列的大小作为指标来触发自动缩放事件以添加更多工作人员。所以我需要一个关于还有多少未完成工作的真实情况(10K,而不是几百)
我假设调度程序有某种节流阀可以防止它同时转储所有 10K 消息?如果是这样,这是可配置的吗?
仅供参考,我已经在 airflow.cfg
中将“并行度”设置为 10K这是我的测试狗:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
您还需要增加一些其他设置。
在 [core]
下增加 non_pooled_task_slot_count
。这将允许更多任务实际在 celery 中排队。
在 [celery
] 下增加 celeryd_concurrency
。这将增加每个工作人员同时尝试从队列中 运行 的任务数。
话虽如此,为了回应你的第一个原因......
尽管如此,如果调度程序未 运行ning,则剩余任务将不会排队,但这是因为 Airflow 调度程序设计为长期存在。当您的工作人员 运行ning 时,它应该始终是 运行ning。如果调度程序因任何原因被终止或死亡,一旦它重新启动,它将从中断的地方继续。
感谢那些建议其他并发设置的人。通过反复试验,我了解到我需要设置所有这三个:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
仅启用这两个,我可以达到 10K,但它非常慢,每 30 秒仅以阶梯方式突发性地添加 100 个新任务:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
如果我只启用这两个,它是相同的 "stair-step" 模式,每 30 秒添加 128:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
但是如果我设置所有三个,它确实会一次性将 10K 添加到队列中。