如何在气流中设置两名工人
How to setup two workers in airflow
我有两个工人和三个任务。
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_task1',
bash_command='task1.py',
dag=dag)
t2 = BashOperator(
task_id='print_task2',
bash_command='task2.py',
dag=dag)
t3 = BashOperator(
task_id='print_task3',
bash_command='task3.py',
dag=dag)
t1 >> t2 >> t3
比方说,我正在对特定文件执行 tasks(t1,t2,t3)
。目前,一切都在一个工人身上工作,但我想设置另一个工人,它将获取第一个任务的输出并执行任务 t2,然后执行任务 t3。因此,queue1
将对下一个文件执行 t1
。我怎样才能让两个工人工作。我正在考虑使用 queues
,但无法理解如何让 queue2
等到 queue1
中的任务 t1
完成。
除了启动这两个 worker 之外,您不需要做任何其他事情,它们将在可用时并在您的配置中定义的 concurrency/parallelism 限制范围内接手任务。
在您提供的示例中,任务可能 运行 完全是一个 worker 1
、worker 2
或两者的混合。这是因为 t2
在 t1
完成之前不会开始。在 t1
完成和 t2
开始之间的时间里,两个工人都将空闲(假设你没有其他 dags 运行ning)。将 t2
任务保留给 运行.
将赢得比赛
如果您需要 运行在不同的工作器上执行特定任务(比如让一个或多个工作器具有更高级别的可用资源或特殊硬件),您可以在任务级别指定队列。队列不会影响任务 运行 的顺序,因为 Airflow 调度程序将确保任务不会 运行 直到它的上游任务成功 运行。
我有两个工人和三个任务。
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_task1',
bash_command='task1.py',
dag=dag)
t2 = BashOperator(
task_id='print_task2',
bash_command='task2.py',
dag=dag)
t3 = BashOperator(
task_id='print_task3',
bash_command='task3.py',
dag=dag)
t1 >> t2 >> t3
比方说,我正在对特定文件执行 tasks(t1,t2,t3)
。目前,一切都在一个工人身上工作,但我想设置另一个工人,它将获取第一个任务的输出并执行任务 t2,然后执行任务 t3。因此,queue1
将对下一个文件执行 t1
。我怎样才能让两个工人工作。我正在考虑使用 queues
,但无法理解如何让 queue2
等到 queue1
中的任务 t1
完成。
除了启动这两个 worker 之外,您不需要做任何其他事情,它们将在可用时并在您的配置中定义的 concurrency/parallelism 限制范围内接手任务。
在您提供的示例中,任务可能 运行 完全是一个 worker 1
、worker 2
或两者的混合。这是因为 t2
在 t1
完成之前不会开始。在 t1
完成和 t2
开始之间的时间里,两个工人都将空闲(假设你没有其他 dags 运行ning)。将 t2
任务保留给 运行.
如果您需要 运行在不同的工作器上执行特定任务(比如让一个或多个工作器具有更高级别的可用资源或特殊硬件),您可以在任务级别指定队列。队列不会影响任务 运行 的顺序,因为 Airflow 调度程序将确保任务不会 运行 直到它的上游任务成功 运行。