如何使用 Docker 在 Airflow 中使用 .env 变量设置 S3 连接?

How to setup S3 connection using .env variables in Airflow using Docker?

之前有人问过类似的问题how-to-programmatically-set-up-airflow-1-10-logging-with-localstack-s3-endpoint但是没有解决。

我在 Docker 容器中安装了 Airflow 运行,它是使用 docker-compose 设置的,我遵循了这个 guide。现在我想从 S3 存储桶下载一些数据,但我需要设置凭据以允许这样做。在任何地方,这似乎只能通过手动设置 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 使用 UI 来完成,这在 UI 中公开了这些,我想在代码本身中进行设置通过读取 ENV 变量。在 boto3 中,这将使用:

import boto3
session = boto3.Session(
    aws_access_key_id=settings.AWS_SERVER_PUBLIC_KEY,
    aws_secret_access_key=settings.AWS_SERVER_SECRET_KEY,
)

那么我该如何在 DAGS 的代码中执行此操作?

Code:

import traceback
import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def _download_s3_data(templates_dict, **context):
    # contains a list of the values returned
    data = templates_dict.get("sagemaker_autopilot_data")
    if any([not paths for paths in data]):
        raise AirflowFailException("Some of the paths were not passed!")
    else:
        (
            sagemaker_training,
            sagemaker_testing,
        ) = data
        s3hook = S3Hook()
        # parse the s3 url
        bucket_name, key = s3hook.parse_s3_url(s3url=sagemaker_training)
        try:
            # need aws credentials
            file_name = s3hook.download_file(key=key, bucket_name=bucket_name)
        except:
            traceback.print_exc()
            raise AirflowFailException("Error downloading s3 file")

ENV file:

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

编辑:

Amazon Web Services Connection 似乎是关于它的唯一文档,但它有点令人困惑,并且没有提到如何以编程方式执行此操作。

S3Hook 以aws_conn_id 为参数。您只需为您的气流安装定义一次连接,然后您就可以在您的挂钩中使用该连接。

连接的默认名称是 aws_default(参见 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#default-connection-ids)。只需先创建连接(或编辑,如果它已经存在) - 通过 Airflow UI 或通过环境变量或通过 Secret Backends

这是描述您可以使用的所有选项的文档:

https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html中所述-连接中的登录名用作AWS_ACCESS_KEY_ID,密码用作AWS_SECRET_ACCESS_KEY,但Airflow UI中的AWS连接是自定义并通过自定义字段显示提示和选项,因此您可以轻松地从 UI.

开始

定义连接后,S3 Hook 将读取存储在其使用的连接中的凭据(因此默认情况下:aws_default)。您还可以定义多个具有不同 ID 的 AWS 连接,并在创建 hoook 时将这些连接 ID 作为 aws_conn_id 参数传递。

只是为了补充@Jarek Potiuk 的回答,这就是我最终所做的。

1。使用以下变量创建一个 .env 文件

AIRFLOW_UIDAIRFLOW_GID 通过 运行 获得以下 bash 命令 echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env。其他 AWS 变量特定于我正在使用的临时凭证。

如果您希望我的凭据基于此特定区域,您可以排除 REGION_NAME

AIRFLOW_UID=
AIRFLOW_GID=
REGION_NAME=us-east-1
AWS_SESSION_TOKEN=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

2。将环境变量添加到 docker-compose.yaml

我使用了这个模板,您可以在根目录中通过 运行ning curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml' 直接获取。

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "true"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # Add env variables here!
    AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
    AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
    REGION_NAME: ${REGION_NAME}

3。然后 DAG 的第一部分是 运行 一个使用 Generating a connection URI 指南建立连接的 PythonOperator。

从这里 link 您打算 运行 完成的所有其他任务。

import os
import json
from airflow.models.connection import Connection
from airflow.exceptions import AirflowFailException

def _create_connection(**context):
    """
    Sets the connection information about the environment using the Connection
    class instead of doing it manually in the Airflow UI
    """
    AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
    AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN")
    REGION_NAME = os.getenv("REGION_NAME")
    credentials = [
        AWS_SESSION_TOKEN,
        AWS_ACCESS_KEY_ID,
        AWS_SECRET_ACCESS_KEY,
        REGION_NAME,
    ]
    if not credentials or any(not credential for credential in credentials):
        raise AirflowFailException("Environment variables were not passed")

    extras = json.dumps(
        dict(
            aws_session_token=AWS_SESSION_TOKEN,
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            region_name=REGION_NAME,
        ),
    )
    try:
        Connection(
            conn_id="s3_con",
            conn_type="S3",
            extra=extras,
        )
    except Exception as e:
        raise AirflowFailException(
            f"Error creating connection to Airflow :{e}",
        )