使用 Docker swarm 使用 Celery 集群扩展 Airflow

Scaling Airflow with a Celery cluster using Docker swarm

正如标题所说,我想使用 Docker swarm 在集群(1 个主节点,2 个节点)上设置 运行 的 Airflow。

当前设置:

现在我有 Airflow 设置,它使用在单个 EC2 上 运行ning 的 CeleryExecutor。 我有一个 Docker 文件,可以提取 Airflow 的图像和 pip install -r requirements.txt。 从这个 Docker 文件我正在创建一个本地图像,这个图像被用于 docker-compose.yml 中,它启动了 Airflow 需要的不同服务(网络服务器、调度程序、redis、花和一些 worker.metadb 是位于单独 RDS 上的 Postgres)。 docker-compose 用于 docker 集群模式,即。 docker stack deploy . airflow_stack

所需设置:

我想将当前设置扩展到 3 个 EC2(1 个主节点,2 个节点),主节点将 运行 网络服务器、调度程序、redis 和花,工人将 运行 在节点中。 经过搜索和网络和文档,有一些我仍然不清楚的事情我很想知道

  1. 根据我的理解,为了让节点 运行 工作人员,我从 Docker 文件构建的本地图像需要被推送到某个存储库(如果它是真的需要,我会使用 AWS ECR) 让气流工作人员能够从该图像创建容器。对吗?
  2. 正在同步卷和 env 文件,现在,我正在装载卷并将 envs 插入 docker-compose 文件。这些挂载和环境会同步到节点(和气流工作者容器)吗?如果没有,如何确保一切都同步,因为气流要求所有组件(redis 除外)都具有所有依赖项,等等
  3. 使用 CeleryExecuter 时需要设置的环境之一是 broker_url,我如何确保节点识别主服务器上的 redis 代理

我确定还有一些我忘记了的东西,但我写的是一个好的开始。 任何帮助或建议将不胜感激

谢谢!

Docker文件:

FROM apache/airflow:2.1.3-python3.9
USER root


RUN apt update;
RUN apt -y install build-essential;

USER airflow
COPY requirements.txt requirements.txt
COPY requirements.airflow.txt requirements.airflow.txt

RUN pip install --upgrade pip;
RUN pip install --upgrade wheel;

RUN pip install -r requirements.airflow.txt
RUN pip install -r requirements.txt


EXPOSE 8793 8786 8787

docker-compose.yml:

version: '3.8'
x-airflow-celery: &airflow-celery
  image: local_image:latest
  volumes:
    -some_volume
  env_file:
    -some_env_file

services:
  webserver:
    <<: *airflow-celery
    command: airflow webserver
    restart: always
    ports:
      - 80:8080
    healthcheck:
      test: [ "CMD-SHELL", "[ -f /opt/airflow/airflow-webserver.pid ]" ]
      interval: 10s
      timeout: 30s
      retries: 3

  scheduler:
    <<: *airflow-celery
    command: airflow scheduler
    restart: always
    deploy:
      replicas: 2

  redis:
    image: redis:6.0
    command: redis-server --include /redis.conf
    healthcheck:
      test: [ "CMD", "redis-cli", "ping" ]
      interval: 30s
      timeout: 10s
      retries: 5
    ports:
      - 6379:6379
    environment:
      - REDIS_PORT=6379

  worker:
    <<: *airflow-celery
    command: airflow celery worker
    deploy:
      replicas: 16

  flower:
    <<: *airflow-celery
    command: airflow celery flower
    ports:
      - 5555:5555

听起来您正朝着正确的方向前进(尽管最后有一条一般性评论)。

  1. 是的,您需要将镜像推送到容器注册表并通过 public(如果您进行身份验证则为私有)标签引用它。这种情况下的标签通常是 registry/name:tag。例如,您可以在此处看到 Airlfow 的 CI 图片之一:https://github.com/apache/airflow/pkgs/container/airflow%2Fmain%2Fci%2Fpython3.9 - 目的有点不同(我们将它用于我们的 CI 构建)但机制是相同的:您在本地构建它,使用“registry/image:tag”docker build . --tag registry/image:tag 和 运行 docker push registry/image:tag 进行标记。 然后,每当您从 docker 撰写中引用它时,通过 registry/image:tag、docker compose/swarm 将拉取正确的图像。只需确保在构建图像时制作唯一的 TAG,以了解您推送的图像(并考虑未来的图像)。

  2. Env 文件应该没问题,它们会分布在各个实例中,但本地安装的卷不会。您要么需要一些共享文件系统(如 NFS,如果您使用 AWS,则可能是 EFS)来存储 DAG,或者使用其他一些同步方法来分发 DAG。它可以是例如 git-sync - 它具有非常好的属性,特别是如果您使用 Git 存储 DAG 文件,或将 DAG 烘焙到图像中(这需要在图像更改时重新推送图像) ).您可以在我们的 Helm Chart https://airflow.apache.org/docs/helm-chart/stable/manage-dags-files.html

    中查看不同选项的解释
  3. 您不能使用 localhost 您需要将其设置为特定主机并确保您的代理 URL 可以从所有实例访问。这可以通过将特定 IP address/DNS 名称分配给您的 'broker' 实例并在防火墙中打开正确的端口(确保您控制可以从哪里访问这些端口)甚至可能使用一些负载来完成-平衡。

我不太了解 DockerSwarm 设置它的难易程度,但说实话,这似乎是一项大量的工作,需要手动完成。

我强烈、非常强烈地鼓励您使用 Kubernetes 和 Airlfow 社区开发的 Helm Chart:https://airflow.apache.org/docs/helm-chart/stable/index.html。有很多问题和必要的配置要么在 K8S 中解决(扩展、共享文件系统 - PV、网络和连接、资源管理等),要么由我们的 Helm(Git-Sync 端容器、代理配置等)解决.)

I 运行 Docker Swarm 上的 Airflow CeleryExecutor。

假设您在节点上设置了 Docker Swarm,您可以执行以下操作:

  1. 像这样将共享卷映射到 NFS 文件夹(同样适用于插件和日志,或您需要共享的任何其他内容)
volumes:
  dags:
    driver_opts:
      type: "none"
      o: "bind"
      device: "/nfs/airflow/dags"
  1. 我个人使用 Docker Secrets 来处理我的网络服务器密码、数据库密码等(同样,我使用 Docker 配置来传递我的芹菜和网络服务器配置)
secrets:
  postgresql_password:
    external: true
  fernet_key:
    external: true
  webserver_password:
    external: true

为了让 Airflow 读取机密,我添加了一个简单的 bash 脚本,该脚本被添加到 entrypoint.sh 脚本中。所以在我的堆栈文件中我不需要硬编码任何密码,但是如果 DOCKER-SECRET 字符串可用,那么它会在 /run/secrets/ 中查找(我想我在设置它时用这个作为例子向上 https://gist.github.com/bvis/b78c1e0841cfd2437f03e20c1ee059fe)

在我的入口点脚本中,我添加了扩展 Docker 秘密的脚本:

source /env_secrets_expand.sh

x-airflow-variables: &airflow-variables
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    ...
    AIRFLOW__WEBSERVER__SECRET_KEY: DOCKER-SECRET->webserver_secret_key

这也是 postgres 图像的设置方式,没有环境变量:

services:
  postgres:
    image: postgres:11.5
    secrets:
      - source: postgresql_password
        target: /run/secrets/postgresql_password
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_DB=airflow
      - POSTGRES_PASSWORD_FILE=/run/secrets/postgresql_password
  1. 您显然可以使用 Swarm 标签或主机名来确定某个服务应该使用哪些节点 运行
  scheduler:
    <<: *airflow-common
    environment: *airflow-variables
    command: scheduler
    deploy:
      replicas: 2
      mode: replicated
      placement:
        constraints:
          - node.labels.type == worker
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
        window: 120s
    logging:
      driver: fluentd
      options:
        tag: docker.airflow.scheduler
        fluentd-async-connect: "true"

对于 Celery 工作人员,我有我的默认队列,然后是一个特殊队列,由于历史原因固定到单个节点(客户端已经将这个特定的 IP 地址列入白名单,所以我需要确保任务只 运行 在该节点上)。所以我的入口点 运行s exec airflow celery "$@" -q "$QUEUE_NAME",我的堆栈文件是这样的:

  worker_default:
    <<: *airflow-common
    environment:
      <<: *airflow-variables
      QUEUE_NAME: default
    command: worker
    deploy:
      replicas: 3
      mode: replicated
      placement:
        constraints:
          - node.labels.type == worker

  worker_nodename:
    <<: *airflow-common
    environment:
      <<: *airflow-variables
      QUEUE_NAME: nodename
    command: worker
    deploy:
      replicas: 1
      mode: replicated
      placement:
        constraints:
          - node.hostname == nodename

我使用 Gitlab CI/CD 来部署我的 DAGs/plugins 每当我合并到 main 时,如果我更新 Docker 文件或其他某些文件,我会构建图像并部署服务.我已经 运行 以这种方式使用 Airflow 几年了(2017 年或 2018 年),但我确实计划最终切换到 Kubernetes,因为这似乎是更标准的方法。