气流 docker 群中节点工作程序的意外热关闭(MainProcess)

Unwanted Warm Shutdown (MainProcess) of node worker in airflow docker swarm

我目前正在通过 docker 群为 AWS EC2 实例上的 Apache Airflow 设置远程工作者。

远程工作人员每 60 秒无故关闭并出现以下错误:

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
  warnings.warn(msg, category=DeprecationWarning)
Starting flask
 * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=1000 euid=1000 gid=0 egid=0

[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.

worker: Warm shutdown (MainProcess)

 -------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport:   redis://redis:6379/0
- ** ---------- .> results:     postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . airflow.executors.celery_executor.execute_command

管理器节点上我的 docker 堆栈中的所有 docker 服务 运行 都运行良好,远程节点上的 selenium 服务也是如此。在 Airflow 的 docker 撰写设置 here 之后,我开发了如下所示的 docker 撰写文件。

Postgres、Redis 和 Selenium 是标准映像。

对于气流服务,有两个图像:

  1. airflow-manager这只是启动容器时在本地创建的镜像的名称。

  2. localhost:5000/myadmin/airflow-remote 是推送到本地注册表的同一图像,以便可以从其他机器上看到它。

docker-compose.yaml:

version: '3.7'

services:
  postgres:
    image: postgres:13
    env_file:
      - ./config/postgres_test.env
    ports:
      - 5432:5432
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-d", "postgres", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  redis:
    image: redis:latest
    env_file:
      - ./config/postgres_test.env
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-webserver:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-scheduler:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: scheduler
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-manager:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8794:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-remote:
    image: localhost:5000/myadmin/airflow-remote
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8795:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == worker ]

  airflow-init:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
      - ./config/init.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: version
    depends_on:
      - postgres
      - redis
    deploy:
      placement:
        constraints: [ node.role == manager ]

  flower:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  selenium-chrome:
    image: selenium/standalone-chrome:latest
    ports:
      - 4444:4444
    deploy:
      placement:
        constraints: [ node.role == worker ]
    depends_on: []


volumes:
  postgres-db-volume:


Docker 文件:


FROM apache/airflow:2.0.1-python3.7
COPY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt

环境文件:

airflow_env:


PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__PARALLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true

postgres_test.env:


POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow

init.env:

_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

我看到通过设置 env CELERY_ACKS_LATE=true 解决了这个问题,但这对我的情况没有帮助。

这是一个非常烦人的问题,因为它向我的花卉工人监督发送垃圾邮件,我想扩展到其他节点上的更多工人运行。

你知道这是什么吗?感谢您的帮助!

提前致谢!

只是想让你知道我可以通过设置来解决这个问题 CELERY_WORKER_MAX_TASKS_PER_CHILD=500,否则默认为 50。我们的 Airflow DAG 向该工作人员发送了大约 85 个任务,因此它可能不堪重负。

显然,celery 不接受来自 redis 的更多传入消息,并且如果传出消息管道已满,redis 将关闭 worker。

找了两个人的日子后,我们找到了答案。显然它仍然是一种解决方法,但它现在可以正常工作。 我在 this github issue 中找到了答案。 只是想让你知道。

如果您有进一步的见解,请随时分享。