ZeroMQ 字数统计应用程序在 spark 1.2.1 中编译时出错

ZeroMQ word count app gives error when you compile in spark 1.2.1

我正在尝试将 zeromq 数据流设置为 spark。基本上我拿了 ZeroMQWordCount.scala 应用程序并尝试重新编译它并 运行 它。

我安装了zeromq 2.1和spark 1.2.1 这是我的 Scala 代码:

package org.apache.spark.examples.streaming

import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import akka.zeromq.Subscribe
import akka.util.ByteString

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.zeromq._

import scala.language.implicitConversions
import org.apache.spark.SparkConf

object ZmqBenchmark {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: ZmqBenchmark <zeroMQurl> <topic>")
      System.exit(1)
    }
    //StreamingExamples.setStreamingLogLevels()
    val Seq(url, topic) = args.toSeq
    val sparkConf = new SparkConf().setAppName("ZmqBenchmark")
    // Create the context and set the batch size
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator

    // For this stream, a zeroMQ publisher should be running.
    val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

这是我的 .sbt 依赖文件:

name := "ZmqBenchmark"

version := "1.0"

scalaVersion := "2.10.4"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

resolvers += "Sonatype (releases)" at "https://oss.sonatype.org/content/repositories/releases/"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.1"

libraryDependencies += "org.apache.spark"  %% "spark-streaming" % "1.2.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-zeromq_2.10" % "1.2.1"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.0"

libraryDependencies += "org.zeromq" %% "zeromq-scala-binding" % "0.0.6"

libraryDependencies += "com.typesafe.akka" % "akka-zeromq_2.10.0-RC5" % "2.1.0-RC6"

libraryDependencies += "org.apache.spark" % "spark-examples_2.10" % "1.1.1"

libraryDependencies += "org.spark-project.zeromq" % "zeromq-scala-binding_2.11" % "0.0.7-spark"

应用程序使用 sbt 包编译没有任何错误,但是当我 运行 使用 spark 提交应用程序时,我得到一个错误:

zaid@zaid-VirtualBox:~/spark-1.2.1$ ./bin/spark-submit --master local[*] ./zeromqsub/example/target/scala-2.10/zmqbenchmark_2.10-1.0.jar tcp://127.0.0.1:5553 hello
15/03/06 10:21:11 WARN Utils: Your hostname, zaid-VirtualBox resolves to a loopback address: 127.0.1.1; using 192.168.220.175 instead (on interface eth0)
15/03/06 10:21:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/zeromq/ZeroMQUtils$
    at ZmqBenchmark$.main(ZmqBenchmark.scala:78)
    at ZmqBenchmark.main(ZmqBenchmark.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.zeromq.ZeroMQUtils$
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 9 more

知道为什么会这样吗?我知道该应用程序应该可以工作,因为当我 运行 使用 $/运行-example $ 脚本的相同示例并指向来自 spark 的 ZeroMQWordCount 应用程序时,它 运行s 无一例外。我的猜测是 sbt 文件不正确,我还需要在 sbt 文件中包含什么?

谢谢

您使用的是 ZeroMQUtils.createStream 但行

Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.zeromq.ZeroMQUtils

表明未找到 ZeroMQUtils 的字节码。当 spark 示例是 运行 时,它们是 运行 针对包含 ZeroMQUtils class 的 jar 文件(如 spark-1.2.1/examples/target/scala-2.10/spark-examples-1.2.1-hadoop1.0.4.jar)。一种解决方案是使用 --jars 标志,以便 spark-submit 命令可以找到字节码。在您的情况下,这可能类似于

spark-submit --jars /opt/spark/spark-1.2.1/examples/target/scala-2.10/spark-examples-1.2.1-hadoop1.0.4.jar--master local[*] ./zeromqsub/example/target/scala-2.10/zmqbenchmark_2.10-1.0.jar tcp://127.0.0.1:5553 hello

假设您已经在 /opt/spark 中安装了 spark-1.2.1