如何使用 docker-compose 在分布式气流架构上配置 celery worker?
How to configure celery worker on distributed airflow architecture using docker-compose?
我正在建立一个分布式 Airflow 集群,其中除了 celery workers 之外的所有其他东西都在一台主机上 运行 并且处理是在多台主机上完成的。 airflow2.0 设置使用 Airflow 文档 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml 中给出的 yaml 文件进行配置。在我最初的测试中,当我 运行 一切都在同一个主机上时,我的架构可以很好地工作。问题是,如何在远程主机上启动 celery worker?
到目前为止,我尝试创建上述 docker-compose 的修剪版本,我只在工作主机上启动 celery 工作人员,没有其他。但是我 运行 遇到了数据库连接的一些问题。在修整后的版本中,我更改了 URL 以便它们指向 运行 数据库和 redis 的主机。
dags、日志、插件和 postgresql 数据库位于所有主机可见的共享驱动器上。
我该如何配置?任何想法要检查什么?连接等?
Celery worker docker-compose 配置:
---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
编辑 1:
我在使用日志文件时仍然遇到一些困难。似乎共享日志目录并不能解决丢失日志文件的问题。我按照建议在 main 上添加了 extra_host 定义,并在 worker 机器上打开了端口 8793。
工作任务失败并显示日志:
*** Log file does not exist:
/opt/airflow/logs/tutorial/print_date/2021-07-
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''
远非“最终设置”,这些设置对我有用,在核心节点和工作人员中使用 docker-compose 来自 Airflow:
主节点:
必须可以从运行 Webserver
的主节点访问工作节点。我发现 CeleryExecutor
架构的 this diagram 对解决问题很有帮助。
尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作人员那里检索它们。因此,您的主节点可能不知道您的工作人员的主机名,因此您可以更改主机名的解析方式(hostname_callable
设置,默认为 socket.getfqdn
),或者您只需将名称解析功能添加到Webserver
。这可以通过在 x-airflow-common
定义中添加 extra_hosts
配置键来完成:
---
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
...# env vars
extra_hosts:
- "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
- "worker-02-hostname:worker-02-ip-address"
*请注意,在您有共享驱动器的特定情况下,我认为日志将在本地找到。
- 定义并行性、DAG 并发 和调度程序解析进程。可以通过使用环境变量来完成:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
当然,要设置的值取决于您的具体情况和可用资源。 This article 对主题有很好的概述。 DAG 设置 也可以在 DAG
定义中被覆盖。
工作节点:
定义 worker CELERY__WORKER_CONCURRENCY
,默认可以是机器上可用的 CPU 数量 (docs)。
定义如何到达主节点中的服务运行。设置 IP 或主机名并注意主节点中匹配的暴露端口:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
- 共享相同的 Fernet Key and Secret Key 从“.env”文件读取它们:
environment: &airflow-common-env
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
env_file:
- .env
.env 文件:FERNET_KEY=jvYUaxxxxxxxxxxxxx=
关键集群中的每个节点(主节点和工作节点)都应用了相同的设置。
为辅助服务定义一个主机名,以避免自动生成匹配容器 ID。
公开端口 8793,这是用于从 worker (docs) 获取日志的默认端口:
services:
airflow-worker:
<<: *airflow-common
hostname: ${HOSTNAME}
ports:
- 8793:8793
command: celery worker
restart: always
- 确保每个工作节点主机都是运行相同的时间配置,几分钟的差异可能会导致严重的执行错误,这可能不容易被发现。考虑在主机 OS.
上启用 NTP 服务
如果您通常有繁重的工作负载和高并发性,您可能需要调整 Postgres 设置,例如 max_connections
和 shared_buffers
。这同样适用于主机 OS 网络设置,例如 ip_local_port_range
或 somaxconn
.
在初始集群设置期间我遇到的任何问题,Flower
和 worker 执行日志总是提供有用的详细信息和错误消息,包括任务级日志和 Docker-Compose 服务日志即:docker-compose logs --tail=10000 airflow-worker > worker_logs.log
.
希望对你有用!
我正在建立一个分布式 Airflow 集群,其中除了 celery workers 之外的所有其他东西都在一台主机上 运行 并且处理是在多台主机上完成的。 airflow2.0 设置使用 Airflow 文档 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml 中给出的 yaml 文件进行配置。在我最初的测试中,当我 运行 一切都在同一个主机上时,我的架构可以很好地工作。问题是,如何在远程主机上启动 celery worker?
到目前为止,我尝试创建上述 docker-compose 的修剪版本,我只在工作主机上启动 celery 工作人员,没有其他。但是我 运行 遇到了数据库连接的一些问题。在修整后的版本中,我更改了 URL 以便它们指向 运行 数据库和 redis 的主机。
dags、日志、插件和 postgresql 数据库位于所有主机可见的共享驱动器上。
我该如何配置?任何想法要检查什么?连接等? Celery worker docker-compose 配置:
---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
编辑 1: 我在使用日志文件时仍然遇到一些困难。似乎共享日志目录并不能解决丢失日志文件的问题。我按照建议在 main 上添加了 extra_host 定义,并在 worker 机器上打开了端口 8793。 工作任务失败并显示日志:
*** Log file does not exist:
/opt/airflow/logs/tutorial/print_date/2021-07-
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''
远非“最终设置”,这些设置对我有用,在核心节点和工作人员中使用 docker-compose 来自 Airflow:
主节点:
必须可以从运行
Webserver
的主节点访问工作节点。我发现CeleryExecutor
架构的 this diagram 对解决问题很有帮助。尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作人员那里检索它们。因此,您的主节点可能不知道您的工作人员的主机名,因此您可以更改主机名的解析方式(
hostname_callable
设置,默认为socket.getfqdn
),或者您只需将名称解析功能添加到Webserver
。这可以通过在x-airflow-common
定义中添加extra_hosts
配置键来完成:
---
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
...# env vars
extra_hosts:
- "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
- "worker-02-hostname:worker-02-ip-address"
*请注意,在您有共享驱动器的特定情况下,我认为日志将在本地找到。
- 定义并行性、DAG 并发 和调度程序解析进程。可以通过使用环境变量来完成:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
当然,要设置的值取决于您的具体情况和可用资源。 This article 对主题有很好的概述。 DAG 设置 也可以在 DAG
定义中被覆盖。
工作节点:
定义 worker
CELERY__WORKER_CONCURRENCY
,默认可以是机器上可用的 CPU 数量 (docs)。定义如何到达主节点中的服务运行。设置 IP 或主机名并注意主节点中匹配的暴露端口:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
- 共享相同的 Fernet Key and Secret Key 从“.env”文件读取它们:
environment: &airflow-common-env
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
env_file:
- .env
.env 文件:FERNET_KEY=jvYUaxxxxxxxxxxxxx=
关键集群中的每个节点(主节点和工作节点)都应用了相同的设置。
为辅助服务定义一个主机名,以避免自动生成匹配容器 ID。
公开端口 8793,这是用于从 worker (docs) 获取日志的默认端口:
services:
airflow-worker:
<<: *airflow-common
hostname: ${HOSTNAME}
ports:
- 8793:8793
command: celery worker
restart: always
- 确保每个工作节点主机都是运行相同的时间配置,几分钟的差异可能会导致严重的执行错误,这可能不容易被发现。考虑在主机 OS. 上启用 NTP 服务
如果您通常有繁重的工作负载和高并发性,您可能需要调整 Postgres 设置,例如 max_connections
和 shared_buffers
。这同样适用于主机 OS 网络设置,例如 ip_local_port_range
或 somaxconn
.
在初始集群设置期间我遇到的任何问题,Flower
和 worker 执行日志总是提供有用的详细信息和错误消息,包括任务级日志和 Docker-Compose 服务日志即:docker-compose logs --tail=10000 airflow-worker > worker_logs.log
.
希望对你有用!