迭代列表的气流任务应该 运行 顺序
Airflow tasks iterating over list should run sequentially
我运行正在根据列表执行任务。任务 ID 根据列表递增。这些任务完成后我要执行另一个任务。以下是代码:
with DAG('test',) as dag:
t1 = [PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={
"hour": hours
}
) for hours in ['01', '02', '03']
]
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
发生的事情是这些 hourly
任务都是 运行 并发的,每个任务都有 daily
任务。像这样:
task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily
我想要发生的是这些 hourly
任务应该按顺序执行,最后 daily
任务应该执行:
task_hour_01 >> task_hour_02 >> task_hour_03 >> daily
所以有两个问题:
- 任务应该 运行 顺序。
- 日常任务应该是最后执行的任务,并且应该 运行
一次。
基于此 ,您可以使用辅助变量 t0
来初始化和处理您的运算符。
with DAG('test',) as dag:
hours = ['01', '02', '03']
t0 = None
for hour in hours:
t1 = PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={"hour": hour}
if t0 is not None:
t0 >> t1
t0 = t1
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
您的代码方向正确,您只是缺少使用 chain
:
设置依赖项
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
# Replace with your function logic
def hourly_job():
return 'hourly'
# Replace with your function logic
def daily_job():
return 'daily'
with DAG(dag_id='test', start_date=datetime(2022, 2, 16)) as dag:
hours = ['01', '02', '03']
op_list = [
PythonOperator(task_id=f"task_hour_{hour}", python_callable=hourly_job, op_kwargs={"hour": hour})
for hour in hours]
chain(*op_list)
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
op_list[-1] >> t2
我运行正在根据列表执行任务。任务 ID 根据列表递增。这些任务完成后我要执行另一个任务。以下是代码:
with DAG('test',) as dag:
t1 = [PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={
"hour": hours
}
) for hours in ['01', '02', '03']
]
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
发生的事情是这些 hourly
任务都是 运行 并发的,每个任务都有 daily
任务。像这样:
task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily
我想要发生的是这些 hourly
任务应该按顺序执行,最后 daily
任务应该执行:
task_hour_01 >> task_hour_02 >> task_hour_03 >> daily
所以有两个问题:
- 任务应该 运行 顺序。
- 日常任务应该是最后执行的任务,并且应该 运行 一次。
基于此 t0
来初始化和处理您的运算符。
with DAG('test',) as dag:
hours = ['01', '02', '03']
t0 = None
for hour in hours:
t1 = PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={"hour": hour}
if t0 is not None:
t0 >> t1
t0 = t1
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
您的代码方向正确,您只是缺少使用 chain
:
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
# Replace with your function logic
def hourly_job():
return 'hourly'
# Replace with your function logic
def daily_job():
return 'daily'
with DAG(dag_id='test', start_date=datetime(2022, 2, 16)) as dag:
hours = ['01', '02', '03']
op_list = [
PythonOperator(task_id=f"task_hour_{hour}", python_callable=hourly_job, op_kwargs={"hour": hour})
for hour in hours]
chain(*op_list)
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
op_list[-1] >> t2