Airflow Dag 不会在 python 脚本失败时失败

Airflow Dag doesnot fail on python script failure

我正在使用一些 python 脚本从 ftp 服务器下载文件,我为此创建了 DAG。有时 python 脚本从 FTP 服务器下载文件时会在 "Peer connection reset error" 上失败,但 airflow DAG 不会失败并将任务标记为成功而不是失败。

Below are airflow logs for more information.


[2019-01-03 19:04:40,085] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,085] {ssh_execute_operator.py:146} INFO - [2019-01-03 19:09:14,276 - Download files from SFTP - ERROR] Total 1 file(s) ([u'R0000797-Manifest.xml']) are downloaded successfully. One error is found in downloading file xxxxxx.txt due to Server connection dropped:
[2019-01-03 19:04:40,091] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,090] {ssh_execute_operator.py:146} INFO - [2019-01-03 19:09:14,282 - Download files from SFTP - ERROR] The whole process failed due to Server connection dropped: .
[2019-01-03 19:04:40,091] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,091] {ssh_execute_operator.py:146} INFO - Total 1 file(s) ([u'R0000797-Manifest.xml']) are downloaded successfully.
[2019-01-03 19:04:40,092] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,091] {ssh_execute_operator.py:146} INFO - Traceback (most recent call last):
[2019-01-03 19:04:40,092] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,091] {ssh_execute_operator.py:146} INFO - main(args)
[2019-01-03 19:04:40,092] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,091] {ssh_execute_operator.py:146} INFO - File "/TEST/GET_files.py", line 381, in main
[2019-01-03 19:04:40,093] {base_task_runner.py:98} INFO - Subtask: [2019-01-03 19:04:40,092] {ssh_execute_operator.py:146} INFO - sftp.get(source_file)


As you can see from above logs that python script gave a proper error message to airflow handler but airflow handler shows that message as INFO and it doesn't fail. So please can you suggest me or help me in this scenario? I want to fail the DAG task when any python error occurs. 

************************************
here is the dag code

get_files = SSHExecuteOperator(
    task_id='get_files',
    bash_command=command to run the py script,
    ssh_hook=sshHook,
    dag=dag)

************************************

Expected results: The airflow DAG should fail when python script fails.
Thanks for your help in advance.

set -e; 添加到您的 bash_command。例如:

get_files = SSHExecuteOperator(
    task_id='get_files',
    bash_command='set -e; python example_script.py',
    ssh_hook=sshHook,
    dag=dag)

你为什么不使用 PythonOperator?

此处示例:https://github.com/trbs/airflow-examples/blob/master/dags/example_python_operator.py

文档在这里:https://airflow.apache.org/_modules/airflow/operators/python_operator.html