Airflow:如何 运行 多个工作人员执行一项任务

Airflow: How to run a task on multiple workers

我刚刚用芹菜执行器设置了气流,这是我的 DAG 的框架

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t1)

我有2个工人。其中一个 运行 只有一个名为 local_queue 的队列,另一个 运行 有两个名为 local_queue,test_queue

的队列

我想 运行 只在一台机器上执行任务 1,但在两台机器上执行任务 2 和 3。即在工人 1 运行 上只 local_queue,t2 和 t3 应该 运行 而在工人 2 运行 上 local_queue 和 test_queue 全部 3 (t1,t2 和 t3) 应该 运行。 任务总数 运行s 应该是 5.

然而,当我运行这样的时候,只有3个任务是运行。 1) print_date 是工人 2 的 运行(这是正确的) 2) print_host 仅对工人 1 是 运行(不正确。两个工人都应该 运行)和 3) print_uptime 仅对工人 2 是 运行(也不正确。两个工人都应该 运行)

能否请您指导我如何进行设置,使 5 个任务成为 运行。在生产中,我想通过将机器分组到队列中来管理机器,对于所有具有 QUEUE_A -> 做 X 的机器和所有具有 QUEUE_B -> 做 Y 等的机器

谢谢

不是让一个工作人员处理 2 个队列,而是让每个工作人员处理一个队列。 所以工作命令应该是这样的:

airflow worker -q test_queue
airflow worker -q local_queue

然后有两个相同的任务,但在不同的队列中。

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t3_2 = BashOperator(
    task_id='print_host_2',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2_2 = BashOperator(
    task_id='print_uptime_2',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)

t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)