Airflow 触发 Spark 应用程序导致错误 "Too large frame"
Airflow triggering Spark application results in error "Too large frame"
我有一个 Docker-compose 管道,其中包含用于 Airflow 和 Spark 的容器。我想安排 SparkSubmitOperator
作业,但它因错误 java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
而失败。
Spark 应用程序仅包含创建一个 Spark 会话(我已经注释掉了所有其他内容)。当我手动 运行 Spark 应用程序时(通过转到 Spark 容器的 bash 并执行 spark-submit),一切正常!此外,当我不创建 Spark 会话而只创建 SparkContext 时,它可以正常工作!
这是我的 docker-compose.yml:
version: '3'
x-airflow-common:
&airflow-common
build: ./airflow/
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Europe/Berlin'
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./airflow/logs:/opt/airflow/logs
- ./airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
networks:
- app-tier
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
container_name: airflowPostgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
networks:
- app-tier
redis:
container_name: airflowRedis
image: redis:latest
ports:
- 6380:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
networks:
- app-tier
airflow-webserver:
<<: *airflow-common
container_name: airflowWebserver
command: webserver
ports:
- 8081:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
container_name: airflowScheduler
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
container_name: airflowWorker
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
container_name: airflowInit
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
spark:
image: docker.io/bitnami/spark:3
user: root
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
volumes:
- ./:/app
networks:
- app-tier
spark-worker-1:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
spark-worker-2:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
volumes:
postgres-db-volume:
networks:
app-tier:
driver: bridge
name: app-tier
我的 Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from functions import send_to_kafka, send_to_mongo
# * AIRFLOW ################################
# default arguments
default_args = {
'owner': 'daniel',
'start_date': datetime(2021, 5, 9),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
"retries": 3,
"retry_delay": timedelta(minutes = 1)
}
# * spark DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag_spark = DAG('spark',
description = '', catchup = False, schedule_interval = "@once", default_args = default_args)
s1 = SparkSubmitOperator(
task_id = "spark-job",
application = "/opt/airflow/dags/application.py",
conn_id = "spark_default", # defined under Admin/Connections in Airflow webserver
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,postgresql:postgresql:9.1-901-1.jdbc4",
dag = dag_spark
)
我的应用程序 (application.py) 不起作用:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
有效的应用程序:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
Airflow 管理菜单中定义的连接:
这里是 DAG 创建的日志:https://pastebin.com/FMW3kJ9g
知道为什么会失败吗?
通过向 SparkSession 添加 .master("local")
解决了问题。
我有一个 Docker-compose 管道,其中包含用于 Airflow 和 Spark 的容器。我想安排 SparkSubmitOperator
作业,但它因错误 java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
而失败。
Spark 应用程序仅包含创建一个 Spark 会话(我已经注释掉了所有其他内容)。当我手动 运行 Spark 应用程序时(通过转到 Spark 容器的 bash 并执行 spark-submit),一切正常!此外,当我不创建 Spark 会话而只创建 SparkContext 时,它可以正常工作!
这是我的 docker-compose.yml:
version: '3'
x-airflow-common:
&airflow-common
build: ./airflow/
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Europe/Berlin'
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./airflow/logs:/opt/airflow/logs
- ./airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
networks:
- app-tier
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
container_name: airflowPostgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
networks:
- app-tier
redis:
container_name: airflowRedis
image: redis:latest
ports:
- 6380:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
networks:
- app-tier
airflow-webserver:
<<: *airflow-common
container_name: airflowWebserver
command: webserver
ports:
- 8081:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
container_name: airflowScheduler
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
container_name: airflowWorker
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
container_name: airflowInit
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
spark:
image: docker.io/bitnami/spark:3
user: root
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
volumes:
- ./:/app
networks:
- app-tier
spark-worker-1:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
spark-worker-2:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
volumes:
postgres-db-volume:
networks:
app-tier:
driver: bridge
name: app-tier
我的 Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from functions import send_to_kafka, send_to_mongo
# * AIRFLOW ################################
# default arguments
default_args = {
'owner': 'daniel',
'start_date': datetime(2021, 5, 9),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
"retries": 3,
"retry_delay": timedelta(minutes = 1)
}
# * spark DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag_spark = DAG('spark',
description = '', catchup = False, schedule_interval = "@once", default_args = default_args)
s1 = SparkSubmitOperator(
task_id = "spark-job",
application = "/opt/airflow/dags/application.py",
conn_id = "spark_default", # defined under Admin/Connections in Airflow webserver
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,postgresql:postgresql:9.1-901-1.jdbc4",
dag = dag_spark
)
我的应用程序 (application.py) 不起作用:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
有效的应用程序:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
Airflow 管理菜单中定义的连接:
这里是 DAG 创建的日志:https://pastebin.com/FMW3kJ9g
知道为什么会失败吗?
通过向 SparkSession 添加 .master("local")
解决了问题。