Airflow SSHExecuteOperator() with env=... 未设置远程环境

Airflow SSHExecuteOperator() with env=... not setting remote environment

我正在修改调用进程的环境并附加到它的 PATH 以及设置一些新的环境变量。但是,当我在子进程中打印 os.environ 时,这些更改并没有反映出来。知道会发生什么吗?

我对实例脚本的调用:

ssh_hook = SSHHook(conn_id=ssh_conn_id)
temp_env = os.environ.copy()
temp_env["PATH"] = "/somepath:"+temp_env["PATH"]
run = SSHExecuteOperator(
        bash_command="python main.py",
        env=temp_env,
        ssh_hook=ssh_hook,
        task_id="run",
        dag=dag)

解释:实现分析

如果您查看 Airflow 的 SSHHook class, you'll see that it doesn't incorporate the env argument into the command being remotely run at all. The SSHExecuteOperator implementation 的源代码,将 env= 传递给钩子上的 Popen() 调用,但这只会将其传递给本地 subprocess.Popen()执行,到远程操作。

因此,简而言之:Airflow 不支持通过 SSH 传递环境变量。如果要有这样的支持,则需要将它们合并到正在远程执行的命令中,或者将 SendEnv 选项添加到正在本地执行的 ssh 命令中,以便为每个要发送的命令(即使这样仅当远程 sshd 配置为 AcceptEnv 将要接收的特定环境变量名称列入白名单时才有效。


解决方法:在命令行上传递环境变量

from pipes import quote # in Python 3, make this "from shlex import quote"

def with_prefix_from_env(env_dict, command=None):
    result = 'set -a; '
    for (k,v) in env_dict.items():
        result += '%s=%s; ' % (quote(k), quote(v))
    if command:
        result += command
    return result

SSHExecuteOperator(bash_command=prefix_from_env(temp_env, "python main.py"),
                   ssh_hook=ssh_hook, task_id="run", dag=dag)

解决方法:远程采购

如果您的环境变量是敏感的并且您不希望使用命令记录它们,您可以将它们带外传输并获取包含它们的远程文件。

from pipes import quote

def with_env_from_remote_file(filename, command):
  return "set -a; . %s; %s" % (quote(filename), command)

SSHExecuteOperator(bash_command=with_env_from_remote_file(envfile, "python main.py"),
                   ssh_hook=ssh_hook, task_id="run", dag=dag)

注意set -a指示shell导出所有定义的变量,因此被执行的文件只需要定义带有key=val声明的变量;它们将自动导出。如果从您的 Python 脚本生成此文件,请务必将键和值都用 pipes.quote() 引用,以确保它只执行赋值而不执行 运行 其他命令。 . 关键字是 POSIX 兼容的,等同于 bash source 命令。