如何设置 airflow worker 以允许网络服务器在 docker 的不同机器上获取日志?

How to set up airflow worker to allow webserver fetch logs on different machine with docker?

我最近刚用 docker 容器安装了 airflow 2.1.4,我已经用 docker 在同一台机器上成功设置了 postgres、redis、调度程序、2 个本地工作程序和 flower -撰写。

现在想扩容,在其他机器上设置worker

我能够让工作人员启动 运行,flower 能够找到工作人员节点,工作人员正在从调度程序正确接收任务,但无论任务的结果状态如何,任务将被标记为失败并显示如下错误消息:

*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-29T14:38:37.669734+00:00/1.log
*** Fetching from: http://b7a0154e7e20:8793/log/test/test/2021-10-29T14:38:37.669734+00:00/1.log
*** Failed to fetch log file from worker. [Errno -3] Temporary failure in name resolution

然后我尝试用 AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'

替换 AIRFLOW__CORE__HOSTNAME_CALLABLE: 'socket.getfqdn'

我得到了这个错误:

*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-28T15:47:59.625675+00:00/1.log
*** Fetching from: http://172.18.0.2:8793/log/test/test/2021-10-28T15:47:59.625675+00:00/1.log
*** Failed to fetch log file from worker. [Errno 113] No route to host

然后我尝试将 worker 的端口 8793 映射到它的主机(在下面的 worker_4 中),现在它返回了:

*** Failed to fetch log file from worker. [Errno 111] Connection refused

但有时仍然会出现“名称解析暂时失败”的错误。

我也尝试复制错误中的URL,并将IP替换为主机ip,得到了这样的信息:

Forbidden
You don't have the permission to access the requested resource. It is either read-protected or not readable by the server.

如果需要更多信息,请告诉我。

提前致谢!

下面是我的 docker-compose.yml 对于 scheduler/webserver/flower:

version: '3.4'

x-hosts: &extra_hosts
  postgres: XX.X.XX.XXX
  redis: XX.X.XX.XXX

x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.4}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
    AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
    AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-slack}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./assets:/opt/airflow/assets
    - ./airflow.cfg:/opt/airflow/airflow.cfg
    - /etc/hostname:/etc/hostname
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  extra_hosts: *extra_hosts


services:
  postgres:
    container_name: 'airflow-postgres'
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - ./data/postgres:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    ports:
      - '5432:5432'

  redis:
    image: redis:latest
    container_name: 'airflow-redis'
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    ports:
      - '6379:6379'
    

  airflow-webserver:
    <<: *airflow-common
    container_name: 'airflow-webserver'
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

  airflow-scheduler:
    <<: *airflow-common
    container_name: 'airflow-scheduler'
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

  airflow-worker1:
    build: ./worker_config
    container_name: 'airflow-worker_1'
    command: celery worker -H worker_1
    healthcheck:
      test:
      - "CMD-SHELL"
      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      - redis
      - postgres
    volumes: 
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
      - ./assets:/opt/airflow/assets
      - ./airflow.cfg:/opt/airflow/airflow.cfg
    extra_hosts: *extra_hosts

  airflow-worker2:
    build: ./worker_config
    container_name: 'airflow-worker_2'
    command: celery worker -H worker_2
    healthcheck:
      test:
      - "CMD-SHELL"
      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      - redis
      - postgres
    volumes: 
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
      - ./assets:/opt/airflow/assets
      - ./airflow.cfg:/opt/airflow/airflow.cfg
    extra_hosts: *extra_hosts

  flower:
    <<: *airflow-common
    container_name: 'airflow_flower'
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

和我的 docker-compose.yml for worker 在另一台机器上:

version: '3.4'

x-hosts: &extra_hosts
  postgres: XX.X.XX.XXX
  redis: XX.X.XX.XXX

x-airflow-common:
  &airflow-common
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
    AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
    AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./assets:/opt/airflow/assets
    - ./airflow.cfg:/opt/airflow/airflow.cfg
    - /etc/hostname:/etc/hostname
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  extra_hosts: *extra_hosts

services:
  worker_3:
    build: ./worker_config
    restart: always
    extra_hosts: *extra_hosts
    volumes:
      - ./airflow.cfg:/opt/airflow/airflow.cfg
      - ./dags:/opt/airflow/dags
      - ./assets:/opt/airflow/assets
      - ./logs:/opt/airflow/logs
      - /etc/hostname:/etc/hostname
    entrypoint: airflow celery worker -H worker_3
    environment:
      <<: *airflow-common-env
      WORKER_NAME: worker_147
    healthcheck:
      test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
      interval: 30s
      timeout: 30s
      retries: 3

  worker_4:
    build: ./worker_config_py2
    restart: always
    extra_hosts: *extra_hosts
    volumes:
      - ./airflow.cfg:/opt/airflow/airflow.cfg
      - ./dags:/opt/airflow/dags
      - ./assets:/opt/airflow/assets
      - ./logs:/opt/airflow/logs
      - /etc/hostname:/etc/hostname
    entrypoint: airflow celery worker -H worker_4_py2 -q py2
    environment:
      <<: *airflow-common-env
      WORKER_NAME: worker_4_py2
    healthcheck:
      test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
      interval: 30s
      timeout: 30s
      retries: 3
    ports:
      - 8793:8793

对于此问题:“无法从 worker 获取日志文件。[Errno -3] 名称解析暂时失败”

看起来工作人员的主机名没有被正确解析。 master的web程序需要去worker那里拉取日志,显示在前端页面上。这个过程就是找到worker的主机名。很明显找不到主机名,所以在master的vim/etc/hosts

上添加主机名到IP的映射
  1. 您需要拥有将在除消息代理、元数据库和工作监视器之外的所有容器中使用的映像。以下是 Docker 文件。

2.If 使用 LocalExecutor,调度器和网络服务器必须在同一台主机上。

Docker 文件:

FROM puckel/docker-airflow:1.10.9
COPY airflow/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

此处供 docker 的部门部署网络服务器

网络服务器:

The web program of the master needs to go to the worker to fetch the log and display it on the front-end page. This process is to find the host name of the worker. Obviously, the host name cannot be found, therefore, add the host name to IP mapping on the master's vim /etc/hosts

修复它:

首先,输入以下内容获取配置文件:

helm show values apache-airflow/airflow > values.yaml 

之后检查 fixPermissions 是否为真。

您需要启用持久卷: 启用持久卷 启用:真 worker StatefulSet 的卷大小 大小:10Gi 如果使用自定义存储类,请在此处将名称引用传递给所有 statefulSet 存储类名: 执行init容器到chown日志目录。

修复权限:true

通过以下方式更新您的安装:

helm upgrade --install airflow apache-airflow/airflow -n ai