无法为 apache 气流安装附加要求

Cannot install additional requirements to apache airflow

我正在使用以下 docker-compose 图像,我从以下位置获得此图像:https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test:
        [
          "CMD-SHELL",
          'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
        ]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  ######################################################
  # SPARK SERVICES
  ######################################################

  jupyterlab:
    image: andreper/jupyterlab:3.0.0-spark-3.0.0
    container_name: jupyterlab
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: andreper/spark-master:3.0.0
    container_name: spark-master
    ports:
      - 8081:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8083:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

volumes:
  postgres-db-volume:
  shared-workspace:
    name: "jordi_airflow"
    driver: local
    driver_opts:
      type: "none"
      o: "bind"
      device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"

我正在尝试 运行 以下 DAG:

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator

from datetime import datetime, timedelta
import csv
import requests
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def printar():
    print("success!")


with DAG(
    "forex_data_pipeline",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    downloading_rates = PythonOperator(task_id="test1", python_callable=printar)

    forex_processing = SparkSubmitOperator(
        task_id="spark1",
        application="/opt/airflow/dags/test.py",
        conn_id="spark_conn",
        verbose=False,
    )

    downloading_rates  >> forex_processing

但我在气流中看到这个错误 ui:

Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'

我已指定在 docker-compose 文件中安装额外的 requirements:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}

我写错了吗?我应该如何指定我想在气流中安装的附加 requirements?我可以传递 requirements.txt 吗?如果是这样,我如何指定路径?

_PIP_ADDITIONAL_REQUIREMENTS 环境变量的支持尚未发布。它仅受 docker 图像的 developer/unreleased 版本支持。计划在 Airflow 2.1.1 中提供此功能。有关详细信息,请参阅:Adding extra requirements for build and runtime of the PROD image.

对于旧版本,您应该构建一个新的镜像并在docker-compose.yaml中设置这个镜像。为此,您需要执行几个步骤。

  1. 使用以下内容创建一个新的 Dockerfile
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
  2. 建立新形象:
    docker build . --tag my-company-airflow:2.0.0
    
  3. docker-compose.yaml 文件中设置此图像:
    echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
    

有关详细信息,请参阅: Official guide about running Airflow in docker-compose environment

我特别推荐这个片段,它描述了安装新 pip 包时需要做什么。

ModuleNotFoundError: No module named 'XYZ'

The Docker Compose file uses the latest Airflow image (apache/airflow). If you need to install a new Python library or system library, you can customize and extend it.

我建议您查看有关 building Docker Image 的指南。这解释了如何安装更复杂的依赖项。

我还建议仅使用 Docker-compose 官方网站上的文件并针对特定版本。 Docker-来自较新版本的 compose 文件可能不适用于旧版本的 Airflow,因为我们一直在对这些文件进行许多改进以提高稳定性可靠性和用户体验。

我用过:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }

# Example
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}

# or:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}

(注:加1space)

然后当 airflow docker 首先 运行 Python, 它会自动 pip 安装需求包。

[参考]

  • Running Airflow in Docker — Airflow Documentation
  • _PIP_ADDITIONAL_REQUIREMENTS
    • 如果不为空,airflow 容器将尝试安装变量中指定的要求。示例:lxml==4.6.3 charset-normalizer==1.4.1。适用于 Airflow 图像 2.1.1 及更高版本。