独立使用 UDF 运行 的 Spark 错误

Spark errors with UDFs running on standalone

我是 运行 我的 spark 程序,它在本地运行但不能远程运行。 我的程序有这些组件(容器):

在本地,一切都很好,但是当 运行 带有 UDF 的转换器(其余转换器(即没有 UDF)工作正常)时,我在远程遇到了这个错误:

Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ...

这是我的 spark 会话代码:

val sparkConf = new SparkConf()
.setMaster("spark://spark-master:7077")
.setAppName("My-App")
.set("spark.executor.extraClassPath", "/dependencies/*")

val spark = SparkSession.builder().config(sparkConf).getOrCreate()

因此,具有外部依赖项的作业工作正常,但 UDF 会产生上述错误。 我还尝试将我的应用程序 jar(其中包含驱动程序和 spring 代码以及 worker 中已经存在的所有其他依赖项)添加到 worker 到其中的 dependencies 文件夹,但仍然会产生错误.还尝试将其放置在与驱动程序相同位置的工作人员中,并使用“spark.jars”将其位置添加到 sparkConf,但没有成功。 有什么建议吗?

经过大量谷歌搜索后,我找到了如何集成 Spring-Boot 和 Spark 的解决方案。 我需要更改我的 pom 以使用 shade 插件制作一个 uber-jar。所以我替换了这个:

            <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <fork>true</fork>
                <executable>true</executable>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

与:

            <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring-boot.version}</version>
                </dependency>
            </dependencies>
            <configuration>
                <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>module-info.class</exclude>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                        <resource>META-INF/spring.factories</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>${start-class}</mainClass>
                    </transformer>
                </transformers>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

然后,我将项目 jar 添加到每个 worker,并将这些配置添加到 Spark Session:

    "spark.executor.extraClassPath", "/path/app.jar",
    "spark.driver.extraClassPath", "/path/app.jar",
    "spark.jars", "/path/app.jar",