在 Spark Scala 中映射后无法写入 mongodb

Can't write to mongodb after mapping in Spark Scala

我在读取和映射数据后将数据写入 mongo 时遇到问题。

这是我用来 运行 程序的脚本。

我正在使用 Spark 1.4.0Scala 2.11.7mongo 2.6.10

    #!/usr/bin/env bash
    SPARK_PATH="/Users/username/spark-1.4.0-bin-hadoop2.6/bin/spark-submit"
    CLASS_NAME="com.knx.conversion.ScalaWordCount"
    CLUSTER='local[2]'
    JARS="/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar,/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-java-driver-3.0.3.jar"
    JAR="/Users/username/AggragateConversionFunnel/target/scala-2.11/aggragateconversionfunnel_2.11-1.0.jar"
    PROJECT_PATH="/Users/username/AggragateConversionFunnel"
    cd ${PROJECT_PATH} && sbt package
    ${SPARK_PATH} --class ${CLASS_NAME} --master ${CLUSTER} --jars ${JARS} $JAR

这是这里的主要程序。只需从 [此处][1] 复制并更改输入输出集合。

package com.knx.conversion

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject

object ScalaWordCount {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "Scala Word Count")

    val config = new Configuration()
    config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/first-week.interactions")
    config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/visit_06_2015.output")

    val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

    // Input contains tuples of (ObjectId, BSONObject)
    // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
    val countsRDD = mongoRDD.flatMap(arg => {
      val str = arg._2.get("referer").toString
      str.split("h")
    })
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
    countsRDD.foreach(println)

    val saveRDD = countsRDD.map((tuple) => {
      val bson = new BasicBSONObject()
      bson.put("word", tuple._1)
      bson.put("count", tuple._2.toString)
      (null, bson)
    })
    // Only MongoOutputFormat and config are relevant
    saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)

  }
}

当 运行 我得到错误

5/07/24 15:53:03 INFO DAGScheduler: Job 0 finished: foreach at ScalaWordCount.scala:39, took 1.111442 s
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
    at com.knx.conversion.ScalaWordCount$.main(ScalaWordCount.scala:48)
    at com.knx.conversion.ScalaWordCount.main(ScalaWordCount.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/24 15:53:03 INFO SparkContext: Invoking stop() from shutdown hook

就是不知道为什么会这样,怎么会这样。

  [1]: https://github.com/plaa/mongo-spark/blob/master/src/main/scala/ScalaWordCount.scala

此问题是关于我目前使用的Scala version与Spark Scala版本不匹配。 我正在使用 Scala 2.11.7 编译和打包 jar 但 Spark 1.4.1 正在使用 Scala 2.10.4.

我找到的答案here

然后通过将 Scala 的版本切换到 2.10.4 来解决此问题。