Airflow PythonOperator 任务无论如何都不会失败
Airflow PythonOperator task does not fail no matter what
我正在尝试 运行 我的 python 项目在 docker 中使用 airflow 2.0.1。
目录结构如下:
dags 在这里:path_to_airflow/dags/
python项目代码在这里:path_to_airflow/dags/utils
我在处理异常时遇到了奇怪的气流态度:无论是否有任何异常,我的 PythonOperator 任务总是以成功标记和退出代码 0 状态完成。
有人可以帮我解决这个问题吗?
这是 dag 代码:
from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from utils.main import main
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'test',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1, 0, 0),
max_active_runs=1,
catchup=False,
)
task_dummy = DummyOperator(
task_id='task_dummy',
dag=dag,
)
task_1 = PythonOperator(
task_id='task_1',
python_callable=main,
dag=dag,
)
task_dummy >> task_1
主要功能代码如下:
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from airflow.exceptions import AirflowException
def main(data_path='/opt/airflow/dags/data/'):
...
some code
...
if condition:
raise AirflowException('empty data')
if __name__ == '__main__':
main()
这是 dag 的日志:
这是 dag 的最终状态:
用 BashOperator 替换 PythonOperator 时会发现所有异常,但我只想了解问题所在。
对于测试,您可以在主函数中不带条件地保留异常。
谢谢你的时间。
正如评论中所讨论的,问题是错误的观察。
我正在尝试 运行 我的 python 项目在 docker 中使用 airflow 2.0.1。
目录结构如下:
dags 在这里:path_to_airflow/dags/ python项目代码在这里:path_to_airflow/dags/utils
我在处理异常时遇到了奇怪的气流态度:无论是否有任何异常,我的 PythonOperator 任务总是以成功标记和退出代码 0 状态完成。 有人可以帮我解决这个问题吗?
这是 dag 代码:
from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from utils.main import main
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'test',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1, 0, 0),
max_active_runs=1,
catchup=False,
)
task_dummy = DummyOperator(
task_id='task_dummy',
dag=dag,
)
task_1 = PythonOperator(
task_id='task_1',
python_callable=main,
dag=dag,
)
task_dummy >> task_1
主要功能代码如下:
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from airflow.exceptions import AirflowException
def main(data_path='/opt/airflow/dags/data/'):
...
some code
...
if condition:
raise AirflowException('empty data')
if __name__ == '__main__':
main()
这是 dag 的日志:
这是 dag 的最终状态:
用 BashOperator 替换 PythonOperator 时会发现所有异常,但我只想了解问题所在。 对于测试,您可以在主函数中不带条件地保留异常。 谢谢你的时间。
正如评论中所讨论的,问题是错误的观察。