迭代列表的气流任务应该 运行 顺序

Airflow tasks iterating over list should run sequentially

我运行正在根据列表执行任务。任务 ID 根据列表递增。这些任务完成后我要执行另一个任务。以下是代码:

with DAG('test',) as dag:

        t1 = [PythonOperator(
        task_id=f"task_hour_{hours}",
        python_callable=hourly_job,
        op_kwargs={
        "hour": hours
        }
        ) for hours in ['01', '02', '03']
        ]
    
        t2 = PythonOperator(
        task_id="daily",
        python_callable=daily_job
        )
    t1 >> t2

发生的事情是这些 hourly 任务都是 运行 并发的,每个任务都有 daily 任务。像这样:

task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily

我想要发生的是这些 hourly 任务应该按顺序执行,最后 daily 任务应该执行:

task_hour_01 >> task_hour_02 >> task_hour_03 >> daily

所以有两个问题:

基于此 ,您可以使用辅助变量 t0 来初始化和处理您的运算符。

with DAG('test',) as dag:
    hours = ['01', '02', '03']
    t0 = None

    for hour in hours:
        t1 = PythonOperator(
             task_id=f"task_hour_{hours}",
             python_callable=hourly_job,
             op_kwargs={"hour": hour}
        if t0 is not None:
            t0 >> t1
        t0 = t1
    t2 = PythonOperator(
    task_id="daily",
    python_callable=daily_job
    )
t1 >> t2

您的代码方向正确,您只是缺少使用 chain:

设置依赖项
from datetime import datetime

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator

# Replace with your function logic
def hourly_job():
    return 'hourly'


# Replace with your function logic
def daily_job():
    return 'daily'


with DAG(dag_id='test', start_date=datetime(2022, 2, 16)) as dag:
    hours = ['01', '02', '03']
    op_list = [
        PythonOperator(task_id=f"task_hour_{hour}", python_callable=hourly_job, op_kwargs={"hour": hour})
        for hour in hours]
    chain(*op_list)
    t2 = PythonOperator(
        task_id="daily",
        python_callable=daily_job
    )

    op_list[-1] >> t2