Airflow PythonOperator 任务无论如何都不会失败

Airflow PythonOperator task does not fail no matter what

我正在尝试 运行 我的 python 项目在 docker 中使用 airflow 2.0.1。

目录结构如下:

dags 在这里:path_to_airflow/dags/ python项目代码在这里:path_to_airflow/dags/utils

我在处理异常时遇到了奇怪的气流态度:无论是否有任何异常,我的 PythonOperator 任务总是以成功标记和退出代码 0 状态完成。 有人可以帮我解决这个问题吗?

这是 dag 代码:

from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator

from utils.main import main

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1, 0, 0),
    max_active_runs=1,
    catchup=False,
)

task_dummy = DummyOperator(
    task_id='task_dummy',
    dag=dag,
)

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=main,
    dag=dag,
)

task_dummy >> task_1

主要功能代码如下:

import os
import sys

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

from airflow.exceptions import AirflowException

def main(data_path='/opt/airflow/dags/data/'):

    ...
    some code
    ...
    
    if condition:
        raise AirflowException('empty data')

if __name__ == '__main__':
    main()

这是 dag 的日志:

这是 dag 的最终状态:

用 BashOperator 替换 PythonOperator 时会发现所有异常,但我只想了解问题所在。 对于测试,您可以在主函数中不带条件地保留异常。 谢谢你的时间。

正如评论中所讨论的,问题是错误的观察。