Class 集成 Spark Streaming 和 Apache Ignite 时出现 Not Found 错误
Class Not Found Error when Integrating Spark Streaming and Apache Ignite
我正在尝试将 Apache Ignite 集成到现有的用 Java 编写的 Spark Streaming 项目中,该项目计算文本文件中的单词数。但是,当我添加 ignite-spark 的依赖项时,出现 Class Not Found 错误:
java.lang.ClassNotFoundException: org.spark_project.protobuf.GeneratedMessage
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:67)
at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:66)
at scala.util.Try$.apply(Try.scala:161)
at akka.actor.ReflectiveDynamicAccess.getClassFor(DynamicAccess.scala:66)
at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at akka.serialization.Serialization.<init>(Serialization.scala:181)
at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.serialization.SerializationExtension$.apply(SerializationExtension.scala:12)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:175)
at akka.actor.ActorSystemImpl.liftedTree2(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:52)
at org.apache.spark.util.Utils$$anonfun$startServiceOnPort.apply$mcVI$sp(Utils.scala:1920)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1911)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:253)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:254)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:450)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:162)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:180)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:67)
at wordCount_test.TestSparkWordCount.setUp(TestSparkWordCount.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)`
这是我的 pom.xml 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wordCount_Spark</groupId>
<artifactId>wordCount_Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wordCount_Spark</name>
<description>Count words using spark</description>
<dependencies>
<!-- Spark Streaming dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.spark-project.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0-spark</version>
</dependency>
<!-- Apache Ignite dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.0</version>
<!-- <scope>test</scope> -->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
</configuration>
</plugin>
</plugins>
</build>
我已经尝试在 POM 中切换 Spark 和 Ignite 的顺序,但是当我尝试 运行 我的 JUnit 测试时它仍然会抛出错误。这是我运行ning:
的测试
package wordCount_test;
import static org.junit.Assert.*;
import java.io.File;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestSparkWordCount {
JavaSparkContext jsc;
File txtFile;
@Before
public void setUp() throws Exception {
jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
txtFile = new File("AIW_WordCount");
if(txtFile.exists()){
txtFile.delete();
}
}
@After
public void tearDown() throws Exception {
if(jsc != null){
jsc.stop();
jsc = null;
}
}
@Test
public void testSparkInit() {
assertNotNull(jsc.sc());
}
}
是否可以在同一个项目中同时使用 Ignite 和 Spark Streaming?我缺少依赖项吗?我做错了什么或错过了一步吗?
如果您能提供任何帮助或指导,我们将不胜感激!
如果我删除对 com.google.protobuf
工件的排除,该测试对我有效。
我当前的依赖项列表如下:
<dependencies>
<!-- Spark Streaming dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<!-- Apache Ignite dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.0</version>
<!-- <scope>test</scope> -->
</dependency>
</dependencies>
我正在尝试将 Apache Ignite 集成到现有的用 Java 编写的 Spark Streaming 项目中,该项目计算文本文件中的单词数。但是,当我添加 ignite-spark 的依赖项时,出现 Class Not Found 错误:
java.lang.ClassNotFoundException: org.spark_project.protobuf.GeneratedMessage
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:67)
at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor.apply(DynamicAccess.scala:66)
at scala.util.Try$.apply(Try.scala:161)
at akka.actor.ReflectiveDynamicAccess.getClassFor(DynamicAccess.scala:66)
at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
at akka.serialization.Serialization$$anonfun.apply(Serialization.scala:181)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at akka.serialization.Serialization.<init>(Serialization.scala:181)
at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.serialization.SerializationExtension$.apply(SerializationExtension.scala:12)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:175)
at akka.actor.ActorSystemImpl.liftedTree2(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun.apply(AkkaUtils.scala:52)
at org.apache.spark.util.Utils$$anonfun$startServiceOnPort.apply$mcVI$sp(Utils.scala:1920)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1911)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:253)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:254)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:450)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:162)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:180)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:67)
at wordCount_test.TestSparkWordCount.setUp(TestSparkWordCount.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)`
这是我的 pom.xml 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wordCount_Spark</groupId>
<artifactId>wordCount_Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wordCount_Spark</name>
<description>Count words using spark</description>
<dependencies>
<!-- Spark Streaming dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.spark-project.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0-spark</version>
</dependency>
<!-- Apache Ignite dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.0</version>
<!-- <scope>test</scope> -->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
</configuration>
</plugin>
</plugins>
</build>
我已经尝试在 POM 中切换 Spark 和 Ignite 的顺序,但是当我尝试 运行 我的 JUnit 测试时它仍然会抛出错误。这是我运行ning:
的测试package wordCount_test;
import static org.junit.Assert.*;
import java.io.File;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestSparkWordCount {
JavaSparkContext jsc;
File txtFile;
@Before
public void setUp() throws Exception {
jsc = new JavaSparkContext("local[2]", "testSparkWordCount");
txtFile = new File("AIW_WordCount");
if(txtFile.exists()){
txtFile.delete();
}
}
@After
public void tearDown() throws Exception {
if(jsc != null){
jsc.stop();
jsc = null;
}
}
@Test
public void testSparkInit() {
assertNotNull(jsc.sc());
}
}
是否可以在同一个项目中同时使用 Ignite 和 Spark Streaming?我缺少依赖项吗?我做错了什么或错过了一步吗?
如果您能提供任何帮助或指导,我们将不胜感激!
如果我删除对 com.google.protobuf
工件的排除,该测试对我有效。
我当前的依赖项列表如下:
<dependencies>
<!-- Spark Streaming dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<!-- Apache Ignite dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.0</version>
<!-- <scope>test</scope> -->
</dependency>
</dependencies>