在 Spring 中使用 Spark SQL 使用 Avro 引导应用程序

Using Spark SQL from within Spring Boot application with Avro

我正在尝试编写 Spring 引导应用程序,该应用程序 运行 是针对 Spark 主服务器的 Spark SQL 查询。这是应该 运行 查询的代码:

public class SparkJob {

    public void run() {
        SparkConf conf = new SparkConf()
            .setMaster("spark://10.20.30.50:7077")
            .setAppName("show_avro_data");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        DataFrame df = sqlContext.read()
            .format("com.databricks.spark.avro")
            .load("hdfs://localhost:8020/test.avro");

        df.show();
    }

}

run() 方法是从 Spring-MVC 控制器中调用的。它应该执行 Spark 查询 "online" - 即我不想将应用程序上传到 Spark,例如spark-submit.

当我现在启动应用程序和 运行 方法时,我得到以下堆栈跟踪:

2016-11-08 17:38:18.805 ERROR 43109 --- [nio-9003-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]: 
  Servlet.service() for servlet [dispatcherServlet] in context with path [] 
  threw exception [Request processing failed; nested exception is 
  org.apache.spark.SparkException: Job aborted due to stage failure: 
  Task 0 in stage 0.0 failed 4 times, most recent failure: 
  Lost task 0.3 in stage 0.0 (TID 3, 10.20.30.50): 
  java.lang.ClassNotFoundException:
  com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$$anonfun
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

由于我使用的是 Spark 版本 1.5.2 和 Scala 版本 2.10.4 我使用了以下 Maven 依赖项 according to the spark-avro documentation:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.10</artifactId>
    <version>2.0.1</version>
</dependency>

我想,我必须以某种方式在 Spark master 上提供 Avro 库 - 但我不知道如何做到这一点以及我在互联网上可以找到的所有示例都没有提供任何解决这个问题的方法.

编辑

应用程序工作正常,当我使用 JSON 而不是 Avro 和

DataFrame df = sqlContext.read().json("hdfs://...");

由于您是从 spring 应用程序调用 spark(您没有使用 spark-submit),因此您正在尝试将 MVC 用作 "driver" 和 运行 spark 作业在独立的火花上。

所以你想用spring MVC作为驱动,并且在client模式下。对吗?

以下依赖项仅在您的 MVC spring project 中(在 spark 驱动程序 client 端),因此您需要将此库添加到独立的 spark(对工作人员)。

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.10</artifactId>
    <version>2.0.1</version>
</dependency>

您可以 addJar spark-avro_2.10.jar 到 HDFS 并添加到 SparkContext 像这样的东西:

 JavaSparkContext sc = new JavaSparkContext(conf);
 sc.addJar("pathToAvroOnHDFS/spark-avro_2.10.jar")

现在 spark 知道 spark-avro_2.10.jar

另一种选择是将此 jar 添加到 spark 类路径中。您可以检查以下属性(spark 1.5.2 也应该支持这些属性):

spark.driver.extraClassPath /avroPath.jar
spark.executor.extraClassPath /avroPath.jar