根据气流中的上游任务杀死下游任务

Kill downstream task depending on upstream task in airflow

我有一个 DAG,其中有 2 个任务。

t1=  PythonOperator(
    task_id='Check_Files_in_S3',
    provide_context=False,
    python_callable=checkFilesInS3,
    xcom_push=True,
    dag=dag)

t2 =  PythonOperator(
    task_id='snowflakeLoad',
    provide_context=True,
    python_callable=snowflakeLoad,
    xcom_push=True,
    dag=dag)

t1>>t2

第一个任务在 S3 中查找文件,如果文件可用,则任务应该成功,并且应该使用下游任务将数据加载到雪花。

我的要求是,如果文件不可用,那么 task1 需要发送邮件,然后它应该停止下游任务 (task2)。有没有办法实现这一点,比如返回 false 会使下游任务失败?如果早期的 DAG 运行 失败,depends_on_past 也不会 运行 DAG 吗?或者它不会 运行 下游任务,如果其中一个任务在当前失败 运行?

你有很多问题,所以让我分解一下。

Is there a way to achieve this something like returning false will fail the downstream task?

等待某事发生的常用方法是使用传感器(继承自 BaseSensorOperator)class。它们专门用于资源消耗低且易于配置以感测外部 objects/resources。 但是,在您的情况下,这是一个检查并忘记(如果不存在则不要重试)。然后你可以坚持使用 PythonOperator。您只需提出一个异常,它就会被视为任务失败,从而阻止下游任务(在您的情况下为 t2)运行.

also depends_on_past will not run the DAG if the earlier DAG run fails? or it don't run the downstream tasks if one of the task failed in the present run?

如果我们有您的两个任务,假设我们有两个 DAGRun。一个在时间点 x,一个在时间点 x + 1。如果您有 depends_on_past=Truex + 1 的任务 t1 的 运行 将在时间 x 查看 运行 t1 并将仅在 运行 成功时才开始。 这同样适用于 x + 1t2,它将检查 x + 1 的任务 t1 是否已完成,然后检查时间 xt2 是否成功.

当没有文件时,您可以在checkFilesInS3中提高AirflowSkipException。这将导致在引发异常之前也跳过下游任务,您也可以发送电子邮件。

from airflow import DAG
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email


def checkFilesInS3(**kwargs):
    your_code
    if no_files: #Raise exception to make downstream tasks skip
        send_email(to="someone@somewhere.com, subject="task failed!"")
        raise AirflowSkipException("No files found!")


def snowflakeLoad(**kwargs):
    pass


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 23),

}
with DAG(dag_id='question',
         default_args=default_args,
         schedule_interval=None,
         catchup=False
         ) as dag:

    start = DummyOperator(task_id='start_task')

    t1 = PythonOperator(
        task_id='Check_Files_in_S3',
        python_callable=checkFilesInS3,
    )

    t2 = PythonOperator(
        task_id='snowflakeLoad',
        python_callable=snowflakeLoad,
    )

    start >> t1 >> t2