Class 集成 Spark Streaming 和 Apache Ignite 时出现 Not Found 错误

Class Not Found Error when Integrating Spark Streaming and Apache Ignite

我正在尝试将 Apache Ignite 集成到现有的用 Java 编写的 Spark Streaming 项目中,该项目计算文本文件中的单词数。但是,当我添加 ignite-spark 的依赖项时,出现 Class Not Found 错误:

java.lang.ClassNotFoundException: org.spark_project.protobuf.GeneratedMessage
  at java.net.URLClassLoader.run(URLClassLoader.java:366)
  at java.net.URLClassLoader.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:67)
  at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:66)
  at scala.util.Try$.apply(Try.scala:161)
  at akka.actor.ReflectiveDynamicAccess.getClassFor(DynamicAccess.scala:66)
  at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
  at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
  at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
  at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
  at akka.serialization.Serialization.<init>(Serialization.scala:181)
  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
  at akka.actor.ExtensionId$class.apply(Extension.scala:79)
  at akka.serialization.SerializationExtension$.apply(SerializationExtension.scala:12)
  at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:175)
  at akka.actor.ActorSystemImpl.liftedTree2(ActorSystem.scala:620)
  at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
  at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
  at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
  at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
  at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
  at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:53)
  at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:52)
  at org.apache.spark.util.Utils$$anonfun$startServiceOnPort.apply$mcVI$sp(Utils.scala:1920)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1911)
  at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
  at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:253)
  at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:254)
  at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
  at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:450)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:162)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:180)
  at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:67)
  at wordCount_test.TestSparkWordCount.setUp(TestSparkWordCount.java:25)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
  at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
  at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
  at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
  at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
  at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53)
  at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
  at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
  at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)`

这是我的 pom.xml 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wordCount_Spark</groupId>
<artifactId>wordCount_Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wordCount_Spark</name>
<description>Count words using spark</description>

<dependencies>
<!-- Spark Streaming dependencies -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
        <exclusions>
            <exclusion>
                <artifactId>protobuf-java</artifactId>
                <groupId>com.google.protobuf</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.spark-project.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>2.5.0-spark</version>
    </dependency>
<!-- Apache Ignite dependencies -->
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-core</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-spring</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-indexing</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-log4j</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-spark</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.0</version>
        <!--  <scope>test</scope> -->
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
                <useIncrementalCompilation>false</useIncrementalCompilation>
            </configuration>
        </plugin>
    </plugins>
</build> 

我已经尝试在 POM 中切换 Spark 和 Ignite 的顺序,但是当我尝试 运行 我的 JUnit 测试时它仍然会抛出错误。这是我运行ning:

的测试
package wordCount_test;

import static org.junit.Assert.*;

import java.io.File;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestSparkWordCount {

  JavaSparkContext jsc;
  File txtFile;

  @Before
  public void setUp() throws Exception {
    jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
    txtFile = new File("AIW_WordCount");
    if(txtFile.exists()){
        txtFile.delete();
    }   
  }

  @After
  public void tearDown() throws Exception {
    if(jsc != null){
        jsc.stop();
        jsc = null;
    }
  }

  @Test
  public void testSparkInit() {
    assertNotNull(jsc.sc());
  }
}

是否可以在同一个项目中同时使用 Ignite 和 Spark Streaming?我缺少依赖项吗?我做错了什么或错过了一步吗?

如果您能提供任何帮助或指导,我们将不胜感激!

如果我删除对 com.google.protobuf 工件的排除,该测试对我有效。

我当前的依赖项列表如下:

<dependencies>
    <!-- Spark Streaming dependencies -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>
    <!-- Apache Ignite dependencies -->
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-core</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-spring</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-indexing</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-log4j</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-spark</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.0</version>
        <!--  <scope>test</scope> -->
    </dependency>
</dependencies>