如何使用 Docker 在 Airflow 中使用 .env 变量设置 S3 连接?
How to setup S3 connection using .env variables in Airflow using Docker?
之前有人问过类似的问题how-to-programmatically-set-up-airflow-1-10-logging-with-localstack-s3-endpoint但是没有解决。
我在 Docker 容器中安装了 Airflow 运行,它是使用 docker-compose 设置的,我遵循了这个 guide。现在我想从 S3 存储桶下载一些数据,但我需要设置凭据以允许这样做。在任何地方,这似乎只能通过手动设置 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
使用 UI 来完成,这在 UI 中公开了这些,我想在代码本身中进行设置通过读取 ENV 变量。在 boto3 中,这将使用:
import boto3
session = boto3.Session(
aws_access_key_id=settings.AWS_SERVER_PUBLIC_KEY,
aws_secret_access_key=settings.AWS_SERVER_SECRET_KEY,
)
那么我该如何在 DAGS 的代码中执行此操作?
Code:
import traceback
import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def _download_s3_data(templates_dict, **context):
# contains a list of the values returned
data = templates_dict.get("sagemaker_autopilot_data")
if any([not paths for paths in data]):
raise AirflowFailException("Some of the paths were not passed!")
else:
(
sagemaker_training,
sagemaker_testing,
) = data
s3hook = S3Hook()
# parse the s3 url
bucket_name, key = s3hook.parse_s3_url(s3url=sagemaker_training)
try:
# need aws credentials
file_name = s3hook.download_file(key=key, bucket_name=bucket_name)
except:
traceback.print_exc()
raise AirflowFailException("Error downloading s3 file")
ENV file:
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
编辑:
Amazon Web Services Connection 似乎是关于它的唯一文档,但它有点令人困惑,并且没有提到如何以编程方式执行此操作。
S3Hook 以aws_conn_id
为参数。您只需为您的气流安装定义一次连接,然后您就可以在您的挂钩中使用该连接。
连接的默认名称是 aws_default
(参见 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#default-connection-ids)。只需先创建连接(或编辑,如果它已经存在) - 通过 Airflow UI 或通过环境变量或通过 Secret Backends
这是描述您可以使用的所有选项的文档:
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
如https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html中所述-连接中的登录名用作AWS_ACCESS_KEY_ID,密码用作AWS_SECRET_ACCESS_KEY,但Airflow UI中的AWS连接是自定义并通过自定义字段显示提示和选项,因此您可以轻松地从 UI.
开始
定义连接后,S3 Hook 将读取存储在其使用的连接中的凭据(因此默认情况下:aws_default
)。您还可以定义多个具有不同 ID 的 AWS 连接,并在创建 hoook 时将这些连接 ID 作为 aws_conn_id
参数传递。
只是为了补充@Jarek Potiuk 的回答,这就是我最终所做的。
1。使用以下变量创建一个 .env
文件
AIRFLOW_UID
和 AIRFLOW_GID
通过 运行 获得以下 bash
命令 echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
。其他 AWS 变量特定于我正在使用的临时凭证。
如果您希望我的凭据基于此特定区域,您可以排除 REGION_NAME
。
AIRFLOW_UID=
AIRFLOW_GID=
REGION_NAME=us-east-1
AWS_SESSION_TOKEN=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
2。将环境变量添加到 docker-compose.yaml
我使用了这个模板,您可以在根目录中通过 运行ning curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'
直接获取。
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "true"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# Add env variables here!
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
REGION_NAME: ${REGION_NAME}
3。然后 DAG 的第一部分是 运行 一个使用 Generating a connection URI 指南建立连接的 PythonOperator。
从这里 link 您打算 运行 完成的所有其他任务。
import os
import json
from airflow.models.connection import Connection
from airflow.exceptions import AirflowFailException
def _create_connection(**context):
"""
Sets the connection information about the environment using the Connection
class instead of doing it manually in the Airflow UI
"""
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN")
REGION_NAME = os.getenv("REGION_NAME")
credentials = [
AWS_SESSION_TOKEN,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
REGION_NAME,
]
if not credentials or any(not credential for credential in credentials):
raise AirflowFailException("Environment variables were not passed")
extras = json.dumps(
dict(
aws_session_token=AWS_SESSION_TOKEN,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION_NAME,
),
)
try:
Connection(
conn_id="s3_con",
conn_type="S3",
extra=extras,
)
except Exception as e:
raise AirflowFailException(
f"Error creating connection to Airflow :{e}",
)
之前有人问过类似的问题how-to-programmatically-set-up-airflow-1-10-logging-with-localstack-s3-endpoint但是没有解决。
我在 Docker 容器中安装了 Airflow 运行,它是使用 docker-compose 设置的,我遵循了这个 guide。现在我想从 S3 存储桶下载一些数据,但我需要设置凭据以允许这样做。在任何地方,这似乎只能通过手动设置 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
使用 UI 来完成,这在 UI 中公开了这些,我想在代码本身中进行设置通过读取 ENV 变量。在 boto3 中,这将使用:
import boto3
session = boto3.Session(
aws_access_key_id=settings.AWS_SERVER_PUBLIC_KEY,
aws_secret_access_key=settings.AWS_SERVER_SECRET_KEY,
)
那么我该如何在 DAGS 的代码中执行此操作?
Code:
import traceback
import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def _download_s3_data(templates_dict, **context):
# contains a list of the values returned
data = templates_dict.get("sagemaker_autopilot_data")
if any([not paths for paths in data]):
raise AirflowFailException("Some of the paths were not passed!")
else:
(
sagemaker_training,
sagemaker_testing,
) = data
s3hook = S3Hook()
# parse the s3 url
bucket_name, key = s3hook.parse_s3_url(s3url=sagemaker_training)
try:
# need aws credentials
file_name = s3hook.download_file(key=key, bucket_name=bucket_name)
except:
traceback.print_exc()
raise AirflowFailException("Error downloading s3 file")
ENV file:
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
编辑:
Amazon Web Services Connection 似乎是关于它的唯一文档,但它有点令人困惑,并且没有提到如何以编程方式执行此操作。
S3Hook 以aws_conn_id
为参数。您只需为您的气流安装定义一次连接,然后您就可以在您的挂钩中使用该连接。
连接的默认名称是 aws_default
(参见 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#default-connection-ids)。只需先创建连接(或编辑,如果它已经存在) - 通过 Airflow UI 或通过环境变量或通过 Secret Backends
这是描述您可以使用的所有选项的文档:
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
如https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html中所述-连接中的登录名用作AWS_ACCESS_KEY_ID,密码用作AWS_SECRET_ACCESS_KEY,但Airflow UI中的AWS连接是自定义并通过自定义字段显示提示和选项,因此您可以轻松地从 UI.
开始定义连接后,S3 Hook 将读取存储在其使用的连接中的凭据(因此默认情况下:aws_default
)。您还可以定义多个具有不同 ID 的 AWS 连接,并在创建 hoook 时将这些连接 ID 作为 aws_conn_id
参数传递。
只是为了补充@Jarek Potiuk 的回答,这就是我最终所做的。
1。使用以下变量创建一个 .env
文件
AIRFLOW_UID
和 AIRFLOW_GID
通过 运行 获得以下 bash
命令 echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
。其他 AWS 变量特定于我正在使用的临时凭证。
如果您希望我的凭据基于此特定区域,您可以排除 REGION_NAME
。
AIRFLOW_UID=
AIRFLOW_GID=
REGION_NAME=us-east-1
AWS_SESSION_TOKEN=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
2。将环境变量添加到 docker-compose.yaml
我使用了这个模板,您可以在根目录中通过 运行ning curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'
直接获取。
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "true"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# Add env variables here!
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
REGION_NAME: ${REGION_NAME}
3。然后 DAG 的第一部分是 运行 一个使用 Generating a connection URI 指南建立连接的 PythonOperator。
从这里 link 您打算 运行 完成的所有其他任务。
import os
import json
from airflow.models.connection import Connection
from airflow.exceptions import AirflowFailException
def _create_connection(**context):
"""
Sets the connection information about the environment using the Connection
class instead of doing it manually in the Airflow UI
"""
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN")
REGION_NAME = os.getenv("REGION_NAME")
credentials = [
AWS_SESSION_TOKEN,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
REGION_NAME,
]
if not credentials or any(not credential for credential in credentials):
raise AirflowFailException("Environment variables were not passed")
extras = json.dumps(
dict(
aws_session_token=AWS_SESSION_TOKEN,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION_NAME,
),
)
try:
Connection(
conn_id="s3_con",
conn_type="S3",
extra=extras,
)
except Exception as e:
raise AirflowFailException(
f"Error creating connection to Airflow :{e}",
)