Airflow 触发 Spark 应用程序导致错误 "Too large frame"

Airflow triggering Spark application results in error "Too large frame"

我有一个 Docker-compose 管道,其中包含用于 Airflow 和 Spark 的容器。我想安排 SparkSubmitOperator 作业,但它因错误 java.lang.IllegalArgumentException: Too large frame: 5211883372140375593 而失败。 Spark 应用程序仅包含创建一个 Spark 会话(我已经注释掉了所有其他内容)。当我手动 运行 Spark 应用程序时(通过转到 Spark 容器的 bash 并执行 spark-submit),一切正常!此外,当我不创建 Spark 会话而只创建 SparkContext 时,它可以正常工作!

这是我的 docker-compose.yml:

version: '3' 


x-airflow-common:
  &airflow-common
  build: ./airflow/
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Europe/Berlin' 
  volumes:
    - ./airflow/dags:/opt/airflow/dags
    - ./airflow/logs:/opt/airflow/logs
    - ./airflow/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  networks:
    - app-tier
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy



services:

  postgres:
    container_name: airflowPostgres
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    networks:
      - app-tier

  redis:
    container_name: airflowRedis
    image: redis:latest
    ports:
      - 6380:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    networks:
      - app-tier

  airflow-webserver:
    <<: *airflow-common
    container_name: airflowWebserver
    command: webserver
    ports:
      - 8081:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    container_name: airflowScheduler
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    container_name: airflowWorker
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    container_name: airflowInit
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  spark:
    image: docker.io/bitnami/spark:3
    user: root
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - '8080:8080'
    volumes: 
      - ./:/app
    networks:
      - app-tier

  spark-worker-1:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - app-tier

  spark-worker-2:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - app-tier
    
volumes:
  postgres-db-volume:

networks:
  app-tier:
    driver: bridge
    name: app-tier

我的 Airflow DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from functions import send_to_kafka, send_to_mongo


# * AIRFLOW ################################

# default arguments
default_args = {
    'owner': 'daniel',
    'start_date': datetime(2021, 5, 9),
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    "retries": 3,
    "retry_delay": timedelta(minutes = 1)
}    


# * spark DAG

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag_spark = DAG('spark', 
                description = '', catchup = False, schedule_interval = "@once", default_args = default_args)

s1 = SparkSubmitOperator(
    task_id = "spark-job",
    application = "/opt/airflow/dags/application.py",
    conn_id = "spark_default", # defined under Admin/Connections in Airflow webserver
    packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,postgresql:postgresql:9.1-901-1.jdbc4",
    dag = dag_spark
)

我的应用程序 (application.py) 不起作用:

from pyspark.sql import SparkSession    

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

有效的应用程序:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

Airflow 管理菜单中定义的连接:

这里是 DAG 创建的日志:https://pastebin.com/FMW3kJ9g

知道为什么会失败吗?

通过向 SparkSession 添加 .master("local") 解决了问题。