如何根据对命令提示符的响应使气流任务失败

how to fail an airflow task based on response to command prompt

我在气流中有这个任务:

def bcp_in(set):
    files = []
    for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
        files.append(file)
    print("LOCAL FILES {}".format(files))
    for file in files:
        print('Importing File {}'.format(file))
        cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2  -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
        print(os.popen(cmd).read())


BCP_Import_FILETYPE_Files = PythonOperator(
    task_id='BCP_Import_Files_to_DB3_Staging',
    python_callable=bcp_in,
    op_kwargs={'set': 'FILETYPE'},
    dag=dag
)

如果一切正常,那就没问题。 但是,如果 BCP 失败,我需要让任务失败。例如:

[2020-01-23 02:45:08,786] {logging_mixin.py:95} INFO - Importing File /home/airflow/airflow/staging/FILETYPE_000000000000.csv
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO - 
Starting copy...
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification

BCP copy in failed
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO - 

此 BCP 失败,但任务仍显示绿色。如果我不查看日志,我不会知道要进行故障排除。

在这种情况下如何报告任务失败。 或者,如果更好……有没有人有更好的运算符来将 CSV/TXT 文件输入 MSSQL?谢谢

您可以使用 -e bcp 选项将任何错误写入文件,然后使用文件的内容来决定任务是否成功。

使用subprocess.check_ouput到运行你的命令,例如:

import subprocess

def bcp_in(set):
    files = []
    for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
        files.append(file)
    print("LOCAL FILES {}".format(files))
    for file in files:
        print('Importing File {}'.format(file))
        cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2  -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
        subprocess.check_output(cmd)

详情:https://docs.python.org/3.7/library/subprocess.html#subprocess.check_output

If the return code was non-zero it raises a CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute and any output in the output attribute.