如何将airflow连接到minio s3

How to connect airflow to minio s3

我正在尝试 运行 docker 带有 airflow 和 minio 的容器,并将 airflow 任务连接到 minio 中定义的桶。我正在使用新版本 - airflow 2.1.3 和最新的 minio 图像。

我如何在 minio 中获取连接的访问​​密钥和访问密钥? 如何定义气流中的连接?

我尝试了多种方法和设置,但我不断得到:botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

我通过 UI 将连接定义为:

conn type: s3
host: locals3 (name of the service in docker-compose)
login: user (also minio_root_user)
password: password  (also minio_root_password)
port: 9000

这是我用来测试连接的任务 ():

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='test',
    bucket_name='airflow-data',
    # aws_conn_id="aws_default",
    timeout=18 * 60 * 60,
    poke_interval=120,
    dag=dag)

谢谢。

编辑: Docker-撰写文件:

version: '3.8'

# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
  - AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth
  - AIRFLOW__CORE__EXECUTOR=LocalExecutor
  - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
  - AIRFLOW__CORE__LOAD_EXAMPLES=False
  - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
  - AIRFLOW__CORE__STORE_DAG_CODE=True
  - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
  - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

x-airflow-image: &airflow_image apache/airflow:2.1.3-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES =======================================

services:
  postgres:
    image: postgres:13-alpine
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "airflow" ]
      interval: 5s
      retries: 5
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  init:
    image: *airflow_image
    depends_on:
      - postgres
    environment: *airflow_environment
    entrypoint: /bin/bash
    command: -c 'airflow db init && airflow users create --username user --password password --firstname Marin --lastname Marin --role Admin --email admin@example.org'

  webserver:
    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    ports:
      - "8080:8080"
    volumes:
      - logs:/opt/airflow/logs
    environment: *airflow_environment
    command: webserver

  scheduler:
    build:
      context: docker
      args:
        AIRFLOW_BASE_IMAGE: *airflow_image
    #    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    volumes:
      - logs:/opt/airflow/logs
      - ./dags:/opt/airflow/dags
    environment: *airflow_environment
    command: scheduler

  locals3:
    image: minio/minio
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=user
      - MINIO_ROOT_PASSWORD=password
    command: "server --console-address :9001 /data"
    volumes:
      - "locals3-data:/data"
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
      interval: 30s
      timeout: 20s
      retries: 3

  locals3_init:
    image: minio/mc
    depends_on:
      - locals3
    entrypoint: >
      /bin/sh -c "
      while ! /usr/bin/mc config host add locals3 http://locals3:9000 user password; do echo 'MinIO not up and running yet...' && sleep 1; done;
      echo 'Added mc host config.';
      /usr/bin/mc mb locals3/airflow-data;
      exit 0;
      "

volumes:
  logs:
  locals3-data:

我希望这可能对某人有所帮助。所以我所做的是我创建了一个用户并通过 UI 使用该用户登录。之后,通过 UI 我为该用户创建了一个服务帐户,该帐户生成了一个密钥和访问密钥。

我更改的 docker-compose 文件部分现在如下所示:

 locals3_init:
    image: minio/mc
    depends_on:
      - locals3
    entrypoint: >
      /bin/sh -c "
      while ! /usr/bin/mc config host add locals3 http://locals3:9000 user password; do echo 'MinIO not up and running yet...' && sleep 1; done;
      echo 'Added mc host config.';
      /usr/bin/mc admin user add locals3 airflow airflow_secret;
      echo 'Added user airflow.';
      /usr/bin/mc admin policy set locals3 readwrite user=airflow;
      /usr/bin/mc mb locals3/data;
      /usr/bin/mc alias set locals3 http://locals3 9RTK1ISXS13J85I4U6JS 4z+akfubnu+XZuoCXhqGwrtq+jgK2AYcrgGH5zsQ --api s3v4;
      exit 0;
      "

也许可以清理一些 mc 命令。

之后我添加了一个连接到气流。我在登录字段中添加了密钥,在密码字段中添加了秘密访问密钥。对于连接类型,我选择了 S3。

现在,在主机字段中添加容器名称(在我的示例中为 locals3),在端口字段中添加端口不起作用。我通过 extras 添加了主机和端口:

{
    "host": "http://locals3:9000"
}

之后我就可以连接了。

我不确定如果我通过 root 用户添加服务帐户或使用 root 凭据,连接是否有效,因为我还没有对此进行测试。

编辑:

使用 root 用户凭据进行测试并且有效。所以问题似乎出在主机和端口的定义方式上。

编辑 2:

比较两个连接字符串:

  1. 添加主机和端口作为额外值:

    s3://user:password@?host=http%3A%2F%2Flocals3%3A9000

  2. 通过字段添加主机和端口:

    s3://user:password@locals3:9000

我能找到为什么第一个有效而第二个无效的唯一解释是因为第二个中的字符没有 url 格式。