Airflow BashOperator - 使用与其 pod 角色不同的角色

Airflow BashOperator - Use different role then its pod role

我已经尝试 运行 以下命令作为 BashOperator 中 bash 脚本 运行s 的一部分:

aws cli ls s3://bucket
aws cli cp ... ...

脚本 运行 成功,但是 aws cli 命令 return 错误,表明 aws cli 没有 运行 所需的权限(如 airflow-worker-node 角色中所定义)

正在调查错误:

  1. 我已经将 docker 运行 中的 awscli 升级到 2.4.9 版(我知道旧版本 awscli 不支持基于 aws role

    授予的权限访问 s3
  2. 我已经使用 BashOperator 调查了 pod 运行ning 我的 bash_script:

  1. 运行 作为 bash 脚本的一部分的上述命令在 BashOperator 中执行给了我不同的结果:

    • 运行 env 显示的环境变量数量有限
    • aws cli returned 权限相关错误。
    • aws sts get-caller-identity - 报告了 eks 角色 (eks-worker-node)

如何在我的 BashOperator bash-script 中授予 aws cli 所需的权限?

查看 BashOperator 源代码,我注意到以下代码:

https://github.com/apache/airflow/blob/main/airflow/operators/bash.py

def get_env(self, context):
    """Builds the set of environment variables to be exposed for the bash command"""
    system_env = os.environ.copy()
    env = self.env
    if env is None:
        env = system_env
    else:
        if self.append_env:
            system_env.update(env)
            env = system_env

以及以下文档:

:param env: If env is not None, it must be a dict that defines the
    environment variables for the new process; these are used instead
    of inheriting the current process environment, which is the default
    behavior. (templated)
:type env: dict
:param append_env: If False(default) uses the environment variables passed in env params
    and does not inherit the current process environment. If True, inherits the environment variables
    from current passes and then environment variable passed by the user will either update the existing
    inherited environment variables or the new variables gets appended to it
:type append_env: bool

如果bash操作员输入的env变量是None,它复制父进程的env变量。 在我的例子中,我提供了一些环境变量,因此它没有将父进程的环境变量复制到 chid 进程中——这导致子进程(BashOperator 进程)使用 eks-worker 的默认 arn_role -node.

简单的解决方案是在 BashOperator() 中设置以下标志:append_env=True 这会将所有现有的 env 变量附加到我手动添加的环境变量中。

我发现我的运行(2.0.1)版本不支持(以后的版本支持)。 作为临时解决方案,我将 **os.environ - 添加到 BashOperator env 参数:

return BashOperator(
    task_id="copy_data_from_mcd_s3",
    env={
        "dag_input": "{{ dag_run.conf }}",
        ......
        **os.environ,
    },
    # append_env=True,- should be supported in 2.2.0
    bash_command="utils/my_script.sh",
    dag=dag,
    retries=1,
)

哪个问题解决了。