尝试从 Spark K8s Operator 连接到 Delta Lake 时遇到 ClassCast 异常

ClassCast Exception Encountered When Trying To Connect To Delta Lake From Spark K8s Operator

有一个简单的程序如下图

import pyspark

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = builder.getOrCreate()

spark._jsc.hadoopConfiguration().set(
    "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark._jsc.hadoopConfiguration().set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)

df = spark.read.format("delta").load(
    "gs://org/delta/bronze/mongodb/registration/audits"
)
print(df.show())

这是使用下面的 Dockerfile

打包到容器中的
FROM varunmallya/spark-pi:3.2.1
USER root
ADD gcs-connector-hadoop2-latest.jar $SPARK_HOME/jars
WORKDIR /app
COPY main.py .

然后使用 spark-on-k8s 运算符

在 k8s 上将该应用部署为 SparkApplication

我预计会看到 20 行数据,但却得到了这个异常

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

然而,当我在本地 jupyter notebook 中 运行 时,我可以看到所需的内容。我已经通过 crd 添加了必要的包 - io.delta:delta-core_2.12:1.2.0 并且还确保了 gcs-connector -hadoop2-latest.jar 可用。

可能是什么问题?

你能试试下面的方法吗Dockerfile:

FROM datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
USER root
WORKDIR /app
COPY main.py .

然后尝试部署 SparkApplication:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: sparky-pi
  namespace: spark
spec:
  type: Python
  mode: cluster
  pythonVersion: "3"
  image: <YOUR_IMAGE_GOES_HERE>:latest
  mainApplicationFile: local:///app/main.py
  sparkVersion: "3.1.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
  executor:
    serviceAccount: spark
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1

我在我的 Kubernetes 集群上 运行 这个并且能够得到:

我认为基本图像 datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17 是关键。支持将它组合在一起的人们!

来源:https://towardsdatascience.com/optimized-docker-images-for-apache-spark-now-public-on-dockerhub-1f9f8fed1665