迭代气流执行
iterative airflow execution
我有以下带有一些简单任务的 DAG,
hour_list = [“0:00”, “1:00", “2:00”]
for hour in hour_list:
bash_op = BashOperator(
task_id=‘task1_op1’+hour
,bash_command=“date”
,dag=dag
)
py_op = PythonOperator(
task_id='doit’+hour,
provide_context=True,
python_callable=python_method,
dag=dag)
py_op.set_upstream(bash_op)
现在,我看到 dag 在 0:00 到 2:00 的所有时间内都在并行执行。这是预期的行为。但是,我想 运行 一个小时后一个 dags,就像第二个小时的执行取决于第一个小时。我不确定设置中的任何更改在这里是否有帮助。我很欣赏你的想法。谢谢。
您可以使用 airflow.operators.sensors.TimeSensor
"in between" 任务完成此操作。类似于以下内容:
from datetime import time
from airflow.operators.sensors import TimeSensor
[...]
for hour in ["00:00", "01:00", "02:00"]:
TimeSensor(
dag=dag,
task_id="wait_{}".format(hour),
target_time=time(*map(int, hour.split(":")))
) >> BashOperator(
dag=dag,
task_id="task1_op1_{}".format(hour),
bash_command="date"
) >> PythonOperator(
dag=dag,
task_id="doit_{}".format(hour),
provide_context=True,
python_callable=python_method
)
我有以下带有一些简单任务的 DAG,
hour_list = [“0:00”, “1:00", “2:00”]
for hour in hour_list:
bash_op = BashOperator(
task_id=‘task1_op1’+hour
,bash_command=“date”
,dag=dag
)
py_op = PythonOperator(
task_id='doit’+hour,
provide_context=True,
python_callable=python_method,
dag=dag)
py_op.set_upstream(bash_op)
现在,我看到 dag 在 0:00 到 2:00 的所有时间内都在并行执行。这是预期的行为。但是,我想 运行 一个小时后一个 dags,就像第二个小时的执行取决于第一个小时。我不确定设置中的任何更改在这里是否有帮助。我很欣赏你的想法。谢谢。
您可以使用 airflow.operators.sensors.TimeSensor
"in between" 任务完成此操作。类似于以下内容:
from datetime import time
from airflow.operators.sensors import TimeSensor
[...]
for hour in ["00:00", "01:00", "02:00"]:
TimeSensor(
dag=dag,
task_id="wait_{}".format(hour),
target_time=time(*map(int, hour.split(":")))
) >> BashOperator(
dag=dag,
task_id="task1_op1_{}".format(hour),
bash_command="date"
) >> PythonOperator(
dag=dag,
task_id="doit_{}".format(hour),
provide_context=True,
python_callable=python_method
)