如何根据对命令提示符的响应使气流任务失败
how to fail an airflow task based on response to command prompt
我在气流中有这个任务:
def bcp_in(set):
files = []
for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
files.append(file)
print("LOCAL FILES {}".format(files))
for file in files:
print('Importing File {}'.format(file))
cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2 -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
print(os.popen(cmd).read())
BCP_Import_FILETYPE_Files = PythonOperator(
task_id='BCP_Import_Files_to_DB3_Staging',
python_callable=bcp_in,
op_kwargs={'set': 'FILETYPE'},
dag=dag
)
如果一切正常,那就没问题。
但是,如果 BCP 失败,我需要让任务失败。例如:
[2020-01-23 02:45:08,786] {logging_mixin.py:95} INFO - Importing File /home/airflow/airflow/staging/FILETYPE_000000000000.csv
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO -
Starting copy...
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
BCP copy in failed
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO -
此 BCP 失败,但任务仍显示绿色。如果我不查看日志,我不会知道要进行故障排除。
在这种情况下如何报告任务失败。
或者,如果更好……有没有人有更好的运算符来将 CSV/TXT 文件输入 MSSQL?谢谢
您可以使用 -e bcp 选项将任何错误写入文件,然后使用文件的内容来决定任务是否成功。
使用subprocess.check_ouput
到运行你的命令,例如:
import subprocess
def bcp_in(set):
files = []
for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
files.append(file)
print("LOCAL FILES {}".format(files))
for file in files:
print('Importing File {}'.format(file))
cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2 -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
subprocess.check_output(cmd)
详情:https://docs.python.org/3.7/library/subprocess.html#subprocess.check_output
If the return code was non-zero it raises a CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute and any output in the output attribute.
我在气流中有这个任务:
def bcp_in(set):
files = []
for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
files.append(file)
print("LOCAL FILES {}".format(files))
for file in files:
print('Importing File {}'.format(file))
cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2 -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
print(os.popen(cmd).read())
BCP_Import_FILETYPE_Files = PythonOperator(
task_id='BCP_Import_Files_to_DB3_Staging',
python_callable=bcp_in,
op_kwargs={'set': 'FILETYPE'},
dag=dag
)
如果一切正常,那就没问题。 但是,如果 BCP 失败,我需要让任务失败。例如:
[2020-01-23 02:45:08,786] {logging_mixin.py:95} INFO - Importing File /home/airflow/airflow/staging/FILETYPE_000000000000.csv
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO -
Starting copy...
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
BCP copy in failed
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO -
此 BCP 失败,但任务仍显示绿色。如果我不查看日志,我不会知道要进行故障排除。
在这种情况下如何报告任务失败。 或者,如果更好……有没有人有更好的运算符来将 CSV/TXT 文件输入 MSSQL?谢谢
您可以使用 -e bcp 选项将任何错误写入文件,然后使用文件的内容来决定任务是否成功。
使用subprocess.check_ouput
到运行你的命令,例如:
import subprocess
def bcp_in(set):
files = []
for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
files.append(file)
print("LOCAL FILES {}".format(files))
for file in files:
print('Importing File {}'.format(file))
cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2 -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
subprocess.check_output(cmd)
详情:https://docs.python.org/3.7/library/subprocess.html#subprocess.check_output
If the return code was non-zero it raises a CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute and any output in the output attribute.