气流中的子进程命令无法找到可执行文件和关联文件

subprocess command inside airflow fails to find executable and associated file

我在气流中有这个任务:

def bcp_in(**kwargs):
    files = []
    for file in glob.glob(Variable.get("temp_directory") + "FILEGROUP1/FILE*.csv"):
        files.append(file)
    print("LOCAL FILES {}".format(files))
    for file in files:
        print('Importing File {}'.format(file))
        cmd = '/opt/mssql-tools/bin/bcp db1.dbo.table1 IN "{file}" -F2  -Utheuser -Pmypassword -Sserver1 -t"~" -c'.format(file=file)
        print(subprocess.check_output(cmd))


BCP_Import_Files = PythonOperator(
    task_id='BCP_Import_Files_to_DB3_Staging',
    python_callable=bcp_in,
    dag=dag
)

假设 airflow 运行ning 下的用户是 airflow,我在命令提示符下毫无困难地测试了该用户下的 bcp。

然而,当我 运行 在正常气流操作下执行此操作时,任务失败并出现此错误:

2020-01-27 20:18:28,731] {taskinstance.py:1051} ERROR - [Errno 2] No such file or directory: '/opt/mssql-tools/bin/bcp db1.dbo.table1 IN "file1" -F2  -Utheuser -Pmypassword -Sserver1 -t"~" -c': '/opt/mssql-tools/bin/bcp db1.dbo.table1 IN "file1" -F2  -Utheuser -Pmypassword -Sserver1 -t"~" -c'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/airflow/dags/BCP_TEST_NOT_PROD.py", line 30, in bcp_in
    print(subprocess.check_output(cmd))
  File "/usr/local/lib/python3.7/subprocess.py", line 395, in check_output
    **kwargs).stdout
  File "/usr/local/lib/python3.7/subprocess.py", line 472, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/local/lib/python3.7/subprocess.py", line 775, in __init__
    restore_signals, start_new_session)
  File "/usr/local/lib/python3.7/subprocess.py", line 1522, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: '/opt/mssql-tools/bin/bcp db1.dbo.table1 IN "file1" -F2  -Utheuser -Pmypassword -Sserver1 -t"~" -c': '/opt/mssql-tools/bin/bcp db1.dbo.table1 IN "file1.csv" -F2  -Utheuser -Pmypassword -Sserver1 -t"~" -c'

对于那些 运行 遇到同样问题的人。

子进程必须将段 shell=True 作为调用的一部分,以便 运行 任何类型的具有参数的命令,或通过字符串或传递给子进程的命令字符串注入。示例:

print(subprocess.check_output(cmd, shell=True))

如果您打算只传递直接命令可执行文件(即 shell 脚本或 file.sh),那么您可以不传递 shell。示例:

print(subprocess.check_output('path/to/exe.sh'))

我希望这对其他人有帮助。