如何在气流中设置两名工人

How to setup two workers in airflow

我有两个工人和三个任务。

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_task1',
    bash_command='task1.py',
    dag=dag)

t2 = BashOperator(
    task_id='print_task2',
    bash_command='task2.py',
    dag=dag)

t3 = BashOperator(
    task_id='print_task3',
    bash_command='task3.py',
    dag=dag)

t1 >> t2 >> t3

比方说,我正在对特定文件执行 tasks(t1,t2,t3)。目前,一切都在一个工人身上工作,但我想设置另一个工人,它将获取第一个任务的输出并执行任务 t2,然后执行任务 t3。因此,queue1 将对下一个文件执行 t1。我怎样才能让两个工人工作。我正在考虑使用 queues,但无法理解如何让 queue2 等到 queue1 中的任务 t1 完成。

除了启动这两个 worker 之外,您不需要做任何其他事情,它们将在可用时并在您的配置中定义的 concurrency/parallelism 限制范围内接手任务。

在您提供的示例中,任务可能 运行 完全是一个 worker 1worker 2 或两者的混合。这是因为 t2t1 完成之前不会开始。在 t1 完成和 t2 开始之间的时间里,两个工人都将空闲(假设你没有其他 dags 运行ning)。将 t2 任务保留给 运行.

将赢得比赛

如果您需要 运行在不同的工作器上执行特定任务(比如让一个或多个工作器具有更高级别的可用资源或特殊硬件),您可以在任务级别指定队列。队列不会影响任务 运行 的顺序,因为 Airflow 调度程序将确保任务不会 运行 直到它的上游任务成功 运行。