独立使用 UDF 运行 的 Spark 错误
Spark errors with UDFs running on standalone
我是 运行 我的 spark 程序,它在本地运行但不能远程运行。
我的程序有这些组件(容器):
- 我的应用程序基于 spring(用于 REST 调用)启动驱动程序(使用 getOrCreate 的 Spark 会话)并具有我构建的所有转换器。
- 基于 bitnami 图像的 Spark Master。
- Spark Worker 基于 bitnami 图像,但也有我的应用程序的所有依赖项(即 /dependencies 目录下的所有 jar)。
在本地,一切都很好,但是当 运行 带有 UDF 的转换器(其余转换器(即没有 UDF)工作正常)时,我在远程遇到了这个错误:
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
...
这是我的 spark 会话代码:
val sparkConf = new SparkConf()
.setMaster("spark://spark-master:7077")
.setAppName("My-App")
.set("spark.executor.extraClassPath", "/dependencies/*")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
因此,具有外部依赖项的作业工作正常,但 UDF 会产生上述错误。
我还尝试将我的应用程序 jar(其中包含驱动程序和 spring 代码以及 worker 中已经存在的所有其他依赖项)添加到 worker 到其中的 dependencies 文件夹,但仍然会产生错误.还尝试将其放置在与驱动程序相同位置的工作人员中,并使用“spark.jars”将其位置添加到 sparkConf,但没有成功。
有什么建议吗?
经过大量谷歌搜索后,我找到了如何集成 Spring-Boot 和 Spark 的解决方案。
我需要更改我的 pom 以使用 shade 插件制作一个 uber-jar。所以我替换了这个:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<executable>true</executable>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
与:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</dependency>
</dependencies>
<configuration>
<keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
然后,我将项目 jar 添加到每个 worker,并将这些配置添加到 Spark Session:
"spark.executor.extraClassPath", "/path/app.jar",
"spark.driver.extraClassPath", "/path/app.jar",
"spark.jars", "/path/app.jar",
我是 运行 我的 spark 程序,它在本地运行但不能远程运行。 我的程序有这些组件(容器):
- 我的应用程序基于 spring(用于 REST 调用)启动驱动程序(使用 getOrCreate 的 Spark 会话)并具有我构建的所有转换器。
- 基于 bitnami 图像的 Spark Master。
- Spark Worker 基于 bitnami 图像,但也有我的应用程序的所有依赖项(即 /dependencies 目录下的所有 jar)。
在本地,一切都很好,但是当 运行 带有 UDF 的转换器(其余转换器(即没有 UDF)工作正常)时,我在远程遇到了这个错误:
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ...
这是我的 spark 会话代码:
val sparkConf = new SparkConf()
.setMaster("spark://spark-master:7077")
.setAppName("My-App")
.set("spark.executor.extraClassPath", "/dependencies/*")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
因此,具有外部依赖项的作业工作正常,但 UDF 会产生上述错误。 我还尝试将我的应用程序 jar(其中包含驱动程序和 spring 代码以及 worker 中已经存在的所有其他依赖项)添加到 worker 到其中的 dependencies 文件夹,但仍然会产生错误.还尝试将其放置在与驱动程序相同位置的工作人员中,并使用“spark.jars”将其位置添加到 sparkConf,但没有成功。 有什么建议吗?
经过大量谷歌搜索后,我找到了如何集成 Spring-Boot 和 Spark 的解决方案。 我需要更改我的 pom 以使用 shade 插件制作一个 uber-jar。所以我替换了这个:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<executable>true</executable>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
与:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</dependency>
</dependencies>
<configuration>
<keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
然后,我将项目 jar 添加到每个 worker,并将这些配置添加到 Spark Session:
"spark.executor.extraClassPath", "/path/app.jar",
"spark.driver.extraClassPath", "/path/app.jar",
"spark.jars", "/path/app.jar",