Airflow:如何 运行 多个工作人员执行一项任务
Airflow: How to run a task on multiple workers
我刚刚用芹菜执行器设置了气流,这是我的 DAG 的框架
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
我有2个工人。其中一个 运行 只有一个名为 local_queue
的队列,另一个 运行 有两个名为 local_queue,test_queue
的队列
我想 运行 只在一台机器上执行任务 1,但在两台机器上执行任务 2 和 3。即在工人 1 运行 上只 local_queue,t2 和 t3 应该 运行 而在工人 2 运行 上 local_queue 和 test_queue 全部 3 (t1,t2 和 t3) 应该 运行。
任务总数 运行s 应该是 5.
然而,当我运行这样的时候,只有3个任务是运行。
1) print_date 是工人 2 的 运行(这是正确的)
2) print_host 仅对工人 1 是 运行(不正确。两个工人都应该 运行)和
3) print_uptime 仅对工人 2 是 运行(也不正确。两个工人都应该 运行)
能否请您指导我如何进行设置,使 5 个任务成为 运行。在生产中,我想通过将机器分组到队列中来管理机器,对于所有具有 QUEUE_A -> 做 X 的机器和所有具有 QUEUE_B -> 做 Y 等的机器
谢谢
不是让一个工作人员处理 2 个队列,而是让每个工作人员处理一个队列。
所以工作命令应该是这样的:
airflow worker -q test_queue
airflow worker -q local_queue
然后有两个相同的任务,但在不同的队列中。
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t3_2 = BashOperator(
task_id='print_host_2',
bash_command='hostname >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2_2 = BashOperator(
task_id='print_uptime_2',
bash_command='uptime >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)
t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)
我刚刚用芹菜执行器设置了气流,这是我的 DAG 的框架
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
我有2个工人。其中一个 运行 只有一个名为 local_queue
的队列,另一个 运行 有两个名为 local_queue,test_queue
我想 运行 只在一台机器上执行任务 1,但在两台机器上执行任务 2 和 3。即在工人 1 运行 上只 local_queue,t2 和 t3 应该 运行 而在工人 2 运行 上 local_queue 和 test_queue 全部 3 (t1,t2 和 t3) 应该 运行。 任务总数 运行s 应该是 5.
然而,当我运行这样的时候,只有3个任务是运行。 1) print_date 是工人 2 的 运行(这是正确的) 2) print_host 仅对工人 1 是 运行(不正确。两个工人都应该 运行)和 3) print_uptime 仅对工人 2 是 运行(也不正确。两个工人都应该 运行)
能否请您指导我如何进行设置,使 5 个任务成为 运行。在生产中,我想通过将机器分组到队列中来管理机器,对于所有具有 QUEUE_A -> 做 X 的机器和所有具有 QUEUE_B -> 做 Y 等的机器
谢谢
不是让一个工作人员处理 2 个队列,而是让每个工作人员处理一个队列。 所以工作命令应该是这样的:
airflow worker -q test_queue
airflow worker -q local_queue
然后有两个相同的任务,但在不同的队列中。
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t3_2 = BashOperator(
task_id='print_host_2',
bash_command='hostname >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2_2 = BashOperator(
task_id='print_uptime_2',
bash_command='uptime >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)
t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)