
Stop performing remaining tasks in airflow

我有三个任务t1,t2,t3。每个任务输出是下一个任务输入,例如,t1 输出是 t2 输入。 t1 完成后,我得到一个空的输出文件夹(这可能发生在我的情况下,这是可以接受的,并将 t1 标记为成功)但是 t2 无法获取输出t1 因为没有文件。如果没有文件,我想将 t2t3 标记为成功。我怎样才能跳过接下来的两个任务。

我浏览了 airflow 文档和其他文章,了解传感器和 poke 方法。但是,不确定如何进行。

当文件不存在时,您可以利用 SensorOperator more specifically the FileSensorOperator to check if a file exists. You can then use the soft_fail 参数将任务标记为 "skipped"。这将允许 DAG 成功,同时维护文件检查中发生的事情的正确历史记录。

@andscoop 的回答很好,只是为了带来更多的想法:

可能的解决方案 1

我正在做类似的事情(依赖项 A > B > C)并且我已经解决了使用上一个任务默认推送的 XCOM 的方法。

Any value that the execute method returns is saved as an Xcom message under the key return_value. We’ll cover this topic later. Source http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/

# copy&paste it into dags/Whosebug.py to test it

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

dag = DAG('Whosebug', description='Another Dag',
          schedule_interval='* * * 1 1',
          start_date=datetime(2018, 6, 27), catchup=False)

def do_a(**kwargs):
    # Assuming that your TASK A is not returning a value
    return None

task_a = PythonOperator(task_id='do_a',

def do_b(**kwargs):
    result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
    if result_from_a:
        print("Continue with your second task")
        print("Send a notification somewhere, log something or stop the job here.")

task_b = PythonOperator(task_id='do_b',
task_a >> task_b

可能的解决方案 2

分支。以更复杂的方式(并使用最佳实践),您可以执行分支以根据 t1 的结果确定下一个 step/task。我现在不能做一个合适的例子,但这里有 2 个来源来理解它是如何与例子一起工作的: