气流 docker 群中节点工作程序的意外热关闭(MainProcess)
Unwanted Warm Shutdown (MainProcess) of node worker in airflow docker swarm
我目前正在通过 docker 群为 AWS EC2 实例上的 Apache Airflow 设置远程工作者。
远程工作人员每 60 秒无故关闭并出现以下错误:
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
warnings.warn(msg, category=DeprecationWarning)
Starting flask
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=1000 euid=1000 gid=0 egid=0
[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.
worker: Warm shutdown (MainProcess)
-------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport: redis://redis:6379/0
- ** ---------- .> results: postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. airflow.executors.celery_executor.execute_command
管理器节点上我的 docker 堆栈中的所有 docker 服务 运行 都运行良好,远程节点上的 selenium 服务也是如此。在 Airflow 的 docker 撰写设置 here 之后,我开发了如下所示的 docker 撰写文件。
Postgres、Redis 和 Selenium 是标准映像。
对于气流服务,有两个图像:
airflow-manager
这只是启动容器时在本地创建的镜像的名称。
localhost:5000/myadmin/airflow-remote
是推送到本地注册表的同一图像,以便可以从其他机器上看到它。
docker-compose.yaml:
version: '3.7'
services:
postgres:
image: postgres:13
env_file:
- ./config/postgres_test.env
ports:
- 5432:5432
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
redis:
image: redis:latest
env_file:
- ./config/postgres_test.env
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
airflow-webserver:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-scheduler:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: scheduler
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-manager:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8794:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-remote:
image: localhost:5000/myadmin/airflow-remote
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8795:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == worker ]
airflow-init:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
- ./config/init.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: version
depends_on:
- postgres
- redis
deploy:
placement:
constraints: [ node.role == manager ]
flower:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
selenium-chrome:
image: selenium/standalone-chrome:latest
ports:
- 4444:4444
deploy:
placement:
constraints: [ node.role == worker ]
depends_on: []
volumes:
postgres-db-volume:
Docker 文件:
FROM apache/airflow:2.0.1-python3.7
COPY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt
环境文件:
airflow_env:
PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__PARALLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true
postgres_test.env:
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow
init.env:
_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
我看到通过设置 env CELERY_ACKS_LATE=true
解决了这个问题,但这对我的情况没有帮助。
这是一个非常烦人的问题,因为它向我的花卉工人监督发送垃圾邮件,我想扩展到其他节点上的更多工人运行。
你知道这是什么吗?感谢您的帮助!
提前致谢!
只是想让你知道我可以通过设置来解决这个问题
CELERY_WORKER_MAX_TASKS_PER_CHILD=500
,否则默认为 50。我们的 Airflow DAG 向该工作人员发送了大约 85 个任务,因此它可能不堪重负。
显然,celery 不接受来自 redis 的更多传入消息,并且如果传出消息管道已满,redis 将关闭 worker。
找了两个人的日子后,我们找到了答案。显然它仍然是一种解决方法,但它现在可以正常工作。
我在 this github issue 中找到了答案。
只是想让你知道。
如果您有进一步的见解,请随时分享。
我目前正在通过 docker 群为 AWS EC2 实例上的 Apache Airflow 设置远程工作者。
远程工作人员每 60 秒无故关闭并出现以下错误:
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
warnings.warn(msg, category=DeprecationWarning)
Starting flask
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=1000 euid=1000 gid=0 egid=0
[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.
worker: Warm shutdown (MainProcess)
-------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport: redis://redis:6379/0
- ** ---------- .> results: postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. airflow.executors.celery_executor.execute_command
管理器节点上我的 docker 堆栈中的所有 docker 服务 运行 都运行良好,远程节点上的 selenium 服务也是如此。在 Airflow 的 docker 撰写设置 here 之后,我开发了如下所示的 docker 撰写文件。
Postgres、Redis 和 Selenium 是标准映像。
对于气流服务,有两个图像:
airflow-manager
这只是启动容器时在本地创建的镜像的名称。localhost:5000/myadmin/airflow-remote
是推送到本地注册表的同一图像,以便可以从其他机器上看到它。
docker-compose.yaml:
version: '3.7'
services:
postgres:
image: postgres:13
env_file:
- ./config/postgres_test.env
ports:
- 5432:5432
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
redis:
image: redis:latest
env_file:
- ./config/postgres_test.env
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
airflow-webserver:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-scheduler:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: scheduler
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-manager:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8794:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-remote:
image: localhost:5000/myadmin/airflow-remote
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8795:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == worker ]
airflow-init:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
- ./config/init.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: version
depends_on:
- postgres
- redis
deploy:
placement:
constraints: [ node.role == manager ]
flower:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
selenium-chrome:
image: selenium/standalone-chrome:latest
ports:
- 4444:4444
deploy:
placement:
constraints: [ node.role == worker ]
depends_on: []
volumes:
postgres-db-volume:
Docker 文件:
FROM apache/airflow:2.0.1-python3.7
COPY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt
环境文件:
airflow_env:
PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__PARALLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true
postgres_test.env:
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow
init.env:
_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
我看到通过设置 env CELERY_ACKS_LATE=true
解决了这个问题,但这对我的情况没有帮助。
这是一个非常烦人的问题,因为它向我的花卉工人监督发送垃圾邮件,我想扩展到其他节点上的更多工人运行。
你知道这是什么吗?感谢您的帮助!
提前致谢!
只是想让你知道我可以通过设置来解决这个问题
CELERY_WORKER_MAX_TASKS_PER_CHILD=500
,否则默认为 50。我们的 Airflow DAG 向该工作人员发送了大约 85 个任务,因此它可能不堪重负。
显然,celery 不接受来自 redis 的更多传入消息,并且如果传出消息管道已满,redis 将关闭 worker。
找了两个人的日子后,我们找到了答案。显然它仍然是一种解决方法,但它现在可以正常工作。 我在 this github issue 中找到了答案。 只是想让你知道。
如果您有进一步的见解,请随时分享。