尝试从 Spark K8s Operator 连接到 Delta Lake 时遇到 ClassCast 异常
ClassCast Exception Encountered When Trying To Connect To Delta Lake From Spark K8s Operator
有一个简单的程序如下图
import pyspark
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = builder.getOrCreate()
spark._jsc.hadoopConfiguration().set(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark._jsc.hadoopConfiguration().set(
"fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
df = spark.read.format("delta").load(
"gs://org/delta/bronze/mongodb/registration/audits"
)
print(df.show())
这是使用下面的 Dockerfile
打包到容器中的
FROM varunmallya/spark-pi:3.2.1
USER root
ADD gcs-connector-hadoop2-latest.jar $SPARK_HOME/jars
WORKDIR /app
COPY main.py .
然后使用 spark-on-k8s 运算符
在 k8s 上将该应用部署为 SparkApplication
我预计会看到 20 行数据,但却得到了这个异常
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
然而,当我在本地 jupyter notebook 中 运行 时,我可以看到所需的内容。我已经通过 crd 添加了必要的包 - io.delta:delta-core_2.12:1.2.0 并且还确保了 gcs-connector -hadoop2-latest.jar 可用。
可能是什么问题?
你能试试下面的方法吗Dockerfile
:
FROM datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
USER root
WORKDIR /app
COPY main.py .
然后尝试部署 SparkApplication
:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: sparky-pi
namespace: spark
spec:
type: Python
mode: cluster
pythonVersion: "3"
image: <YOUR_IMAGE_GOES_HERE>:latest
mainApplicationFile: local:///app/main.py
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
executor:
serviceAccount: spark
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
我在我的 Kubernetes 集群上 运行 这个并且能够得到:
我认为基本图像 datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
是关键。支持将它组合在一起的人们!
有一个简单的程序如下图
import pyspark
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = builder.getOrCreate()
spark._jsc.hadoopConfiguration().set(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark._jsc.hadoopConfiguration().set(
"fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
df = spark.read.format("delta").load(
"gs://org/delta/bronze/mongodb/registration/audits"
)
print(df.show())
这是使用下面的 Dockerfile
打包到容器中的FROM varunmallya/spark-pi:3.2.1
USER root
ADD gcs-connector-hadoop2-latest.jar $SPARK_HOME/jars
WORKDIR /app
COPY main.py .
然后使用 spark-on-k8s 运算符
在 k8s 上将该应用部署为 SparkApplication我预计会看到 20 行数据,但却得到了这个异常
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
然而,当我在本地 jupyter notebook 中 运行 时,我可以看到所需的内容。我已经通过 crd 添加了必要的包 - io.delta:delta-core_2.12:1.2.0 并且还确保了 gcs-connector -hadoop2-latest.jar 可用。
可能是什么问题?
你能试试下面的方法吗Dockerfile
:
FROM datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
USER root
WORKDIR /app
COPY main.py .
然后尝试部署 SparkApplication
:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: sparky-pi
namespace: spark
spec:
type: Python
mode: cluster
pythonVersion: "3"
image: <YOUR_IMAGE_GOES_HERE>:latest
mainApplicationFile: local:///app/main.py
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
executor:
serviceAccount: spark
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
我在我的 Kubernetes 集群上 运行 这个并且能够得到:
我认为基本图像 datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
是关键。支持将它组合在一起的人们!