停止执行气流中的剩余任务
Stop performing remaining tasks in airflow
我有三个任务t1,t2,t3
。每个任务输出是下一个任务输入,例如,t1
输出是 t2
输入。 t1
完成后,我得到一个空的输出文件夹(这可能发生在我的情况下,这是可以接受的,并将 t1
标记为成功)但是 t2
无法获取输出t1
因为没有文件。如果没有文件,我想将 t2
和 t3
标记为成功。我怎样才能跳过接下来的两个任务。
我浏览了 airflow 文档和其他文章,了解传感器和 poke 方法。但是,不确定如何进行。
当文件不存在时,您可以利用 SensorOperator more specifically the FileSensorOperator to check if a file exists. You can then use the soft_fail 参数将任务标记为 "skipped"。这将允许 DAG 成功,同时维护文件检查中发生的事情的正确历史记录。
@andscoop 的回答很好,只是为了带来更多的想法:
可能的解决方案 1
我正在做类似的事情(依赖项 A > B > C)并且我已经解决了使用上一个任务默认推送的 XCOM 的方法。
Any value that the execute method returns is saved as an Xcom message
under the key return_value
. We’ll cover this topic later.
Source http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
# copy&paste it into dags/Whosebug.py to test it
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('Whosebug', description='Another Dag',
schedule_interval='* * * 1 1',
start_date=datetime(2018, 6, 27), catchup=False)
def do_a(**kwargs):
# Assuming that your TASK A is not returning a value
return None
task_a = PythonOperator(task_id='do_a',
python_callable=do_a,
provide_context=True,
dag=dag)
def do_b(**kwargs):
result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
if result_from_a:
print("Continue with your second task")
else:
print("Send a notification somewhere, log something or stop the job here.")
task_b = PythonOperator(task_id='do_b',
python_callable=do_b,
provide_context=True,
dag=dag)
task_a >> task_b
可能的解决方案 2
分支。以更复杂的方式(并使用最佳实践),您可以执行分支以根据 t1
的结果确定下一个 step/task。我现在不能做一个合适的例子,但这里有 2 个来源来理解它是如何与例子一起工作的:
我有三个任务t1,t2,t3
。每个任务输出是下一个任务输入,例如,t1
输出是 t2
输入。 t1
完成后,我得到一个空的输出文件夹(这可能发生在我的情况下,这是可以接受的,并将 t1
标记为成功)但是 t2
无法获取输出t1
因为没有文件。如果没有文件,我想将 t2
和 t3
标记为成功。我怎样才能跳过接下来的两个任务。
我浏览了 airflow 文档和其他文章,了解传感器和 poke 方法。但是,不确定如何进行。
当文件不存在时,您可以利用 SensorOperator more specifically the FileSensorOperator to check if a file exists. You can then use the soft_fail 参数将任务标记为 "skipped"。这将允许 DAG 成功,同时维护文件检查中发生的事情的正确历史记录。
@andscoop 的回答很好,只是为了带来更多的想法:
可能的解决方案 1
我正在做类似的事情(依赖项 A > B > C)并且我已经解决了使用上一个任务默认推送的 XCOM 的方法。
Any value that the execute method returns is saved as an Xcom message under the key
return_value
. We’ll cover this topic later. Source http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
# copy&paste it into dags/Whosebug.py to test it
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('Whosebug', description='Another Dag',
schedule_interval='* * * 1 1',
start_date=datetime(2018, 6, 27), catchup=False)
def do_a(**kwargs):
# Assuming that your TASK A is not returning a value
return None
task_a = PythonOperator(task_id='do_a',
python_callable=do_a,
provide_context=True,
dag=dag)
def do_b(**kwargs):
result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
if result_from_a:
print("Continue with your second task")
else:
print("Send a notification somewhere, log something or stop the job here.")
task_b = PythonOperator(task_id='do_b',
python_callable=do_b,
provide_context=True,
dag=dag)
task_a >> task_b
可能的解决方案 2
分支。以更复杂的方式(并使用最佳实践),您可以执行分支以根据 t1
的结果确定下一个 step/task。我现在不能做一个合适的例子,但这里有 2 个来源来理解它是如何与例子一起工作的: