Airflow SSHExecuteOperator() with env=... 未设置远程环境
Airflow SSHExecuteOperator() with env=... not setting remote environment
我正在修改调用进程的环境并附加到它的 PATH 以及设置一些新的环境变量。但是,当我在子进程中打印 os.environ 时,这些更改并没有反映出来。知道会发生什么吗?
我对实例脚本的调用:
ssh_hook = SSHHook(conn_id=ssh_conn_id)
temp_env = os.environ.copy()
temp_env["PATH"] = "/somepath:"+temp_env["PATH"]
run = SSHExecuteOperator(
bash_command="python main.py",
env=temp_env,
ssh_hook=ssh_hook,
task_id="run",
dag=dag)
解释:实现分析
如果您查看 Airflow 的 SSHHook
class, you'll see that it doesn't incorporate the env
argument into the command being remotely run at all. The SSHExecuteOperator
implementation 的源代码,将 env=
传递给钩子上的 Popen()
调用,但这只会将其传递给本地 subprocess.Popen()
执行,不到远程操作。
因此,简而言之:Airflow 不支持通过 SSH 传递环境变量。如果要有这样的支持,则需要将它们合并到正在远程执行的命令中,或者将 SendEnv
选项添加到正在本地执行的 ssh 命令中,以便为每个要发送的命令(即使这样仅当远程 sshd 配置为 AcceptEnv
将要接收的特定环境变量名称列入白名单时才有效。
解决方法:在命令行上传递环境变量
from pipes import quote # in Python 3, make this "from shlex import quote"
def with_prefix_from_env(env_dict, command=None):
result = 'set -a; '
for (k,v) in env_dict.items():
result += '%s=%s; ' % (quote(k), quote(v))
if command:
result += command
return result
SSHExecuteOperator(bash_command=prefix_from_env(temp_env, "python main.py"),
ssh_hook=ssh_hook, task_id="run", dag=dag)
解决方法:远程采购
如果您的环境变量是敏感的并且您不希望使用命令记录它们,您可以将它们带外传输并获取包含它们的远程文件。
from pipes import quote
def with_env_from_remote_file(filename, command):
return "set -a; . %s; %s" % (quote(filename), command)
SSHExecuteOperator(bash_command=with_env_from_remote_file(envfile, "python main.py"),
ssh_hook=ssh_hook, task_id="run", dag=dag)
注意set -a
指示shell导出所有定义的变量,因此被执行的文件只需要定义带有key=val
声明的变量;它们将自动导出。如果从您的 Python 脚本生成此文件,请务必将键和值都用 pipes.quote()
引用,以确保它只执行赋值而不执行 运行 其他命令。 .
关键字是 POSIX 兼容的,等同于 bash source
命令。
我正在修改调用进程的环境并附加到它的 PATH 以及设置一些新的环境变量。但是,当我在子进程中打印 os.environ 时,这些更改并没有反映出来。知道会发生什么吗?
我对实例脚本的调用:
ssh_hook = SSHHook(conn_id=ssh_conn_id)
temp_env = os.environ.copy()
temp_env["PATH"] = "/somepath:"+temp_env["PATH"]
run = SSHExecuteOperator(
bash_command="python main.py",
env=temp_env,
ssh_hook=ssh_hook,
task_id="run",
dag=dag)
解释:实现分析
如果您查看 Airflow 的 SSHHook
class, you'll see that it doesn't incorporate the env
argument into the command being remotely run at all. The SSHExecuteOperator
implementation 的源代码,将 env=
传递给钩子上的 Popen()
调用,但这只会将其传递给本地 subprocess.Popen()
执行,不到远程操作。
因此,简而言之:Airflow 不支持通过 SSH 传递环境变量。如果要有这样的支持,则需要将它们合并到正在远程执行的命令中,或者将 SendEnv
选项添加到正在本地执行的 ssh 命令中,以便为每个要发送的命令(即使这样仅当远程 sshd 配置为 AcceptEnv
将要接收的特定环境变量名称列入白名单时才有效。
解决方法:在命令行上传递环境变量
from pipes import quote # in Python 3, make this "from shlex import quote"
def with_prefix_from_env(env_dict, command=None):
result = 'set -a; '
for (k,v) in env_dict.items():
result += '%s=%s; ' % (quote(k), quote(v))
if command:
result += command
return result
SSHExecuteOperator(bash_command=prefix_from_env(temp_env, "python main.py"),
ssh_hook=ssh_hook, task_id="run", dag=dag)
解决方法:远程采购
如果您的环境变量是敏感的并且您不希望使用命令记录它们,您可以将它们带外传输并获取包含它们的远程文件。
from pipes import quote
def with_env_from_remote_file(filename, command):
return "set -a; . %s; %s" % (quote(filename), command)
SSHExecuteOperator(bash_command=with_env_from_remote_file(envfile, "python main.py"),
ssh_hook=ssh_hook, task_id="run", dag=dag)
注意set -a
指示shell导出所有定义的变量,因此被执行的文件只需要定义带有key=val
声明的变量;它们将自动导出。如果从您的 Python 脚本生成此文件,请务必将键和值都用 pipes.quote()
引用,以确保它只执行赋值而不执行 运行 其他命令。 .
关键字是 POSIX 兼容的,等同于 bash source
命令。