遇到 None/Falsy 值时不会抛出气流异常

Airflow Exception not being thrown when encountering None/Falsy values

我正在尝试在一个 PythonOperator _etl_lasic 和另一个 PythonOperator _download_s3_data 之间传递数据,这工作正常但我想在传递的值为 None 时抛出异常这应该将任务标记为失败。

import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException


def _etl_lasic(**context):
    path_s3 = None
    context["task_instance"].xcom_push(
        key="path_s3",
        value=path_s3,
    )


def _download_s3_data(templates_dict, **context):
    path_s3 = templates_dict["path_s3"]
    if not path_s3:
        raise AirflowFailException("Path to S3 was not passed!")
    else:
        print(f"Path to S3: {path_s3}")


with DAG(
    dag_id="02_lasic_retraining_without_etl",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@once",
) as dag:

    etl_lasic = PythonOperator(
        task_id="etl_lasic",
        python_callable=_etl_lasic,
    )

    download_s3_data = PythonOperator(
        task_id="download_s3_data",
        python_callable=_download_s3_data,
        templates_dict={
            "path_s3": "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
        },
    )

    etl_lasic >> download_s3_data

日志:

[2021-08-17 04:04:41,128] {logging_mixin.py:103} INFO - Path to S3: None
[2021-08-17 04:04:41,128] {python.py:118} INFO - Done. Returned value was: None
[2021-08-17 04:04:41,143] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=02_lasic_retraining_without_etl, task_id=download_s3_data, execution_date=20210817T040439, start_date=20210817T040440, end_date=20210817T040441
[2021-08-17 04:04:41,189] {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-17 04:04:41,212] {local_task_job.py:118} INFO - Task exited with return code 0

Jinja 模板值默认呈现为字符串。在您的情况下,即使您推送 None 的 XCom 值,当通过 "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}" 拉取该值时,该值实际上 呈现为“None”,根据当前逻辑不会抛出异常。

有两个选项可以解决这个问题:

  1. 不要在“_etl_lasic”函数中将 path_s3 设置为 None,而是将其设置为空字符串。
  2. 如果您使用的是 Airflow 2.1+,则有一个参数 render_template_as_native_obj,可以在 DAG 级别设置,它将 Jinja 模板化的值呈现为本机 Python 类型(列表,字典等)。将该参数设置为 True 将在不更改函数中设置 path_s3 的方式的情况下实现这一目的。记录了一个概念性示例 here