在 Spark Scala 中映射后无法写入 mongodb
Can't write to mongodb after mapping in Spark Scala
我在读取和映射数据后将数据写入 mongo 时遇到问题。
这是我用来 运行 程序的脚本。
我正在使用 Spark 1.4.0
、Scala 2.11.7
和 mongo 2.6.10
#!/usr/bin/env bash
SPARK_PATH="/Users/username/spark-1.4.0-bin-hadoop2.6/bin/spark-submit"
CLASS_NAME="com.knx.conversion.ScalaWordCount"
CLUSTER='local[2]'
JARS="/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar,/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-java-driver-3.0.3.jar"
JAR="/Users/username/AggragateConversionFunnel/target/scala-2.11/aggragateconversionfunnel_2.11-1.0.jar"
PROJECT_PATH="/Users/username/AggragateConversionFunnel"
cd ${PROJECT_PATH} && sbt package
${SPARK_PATH} --class ${CLASS_NAME} --master ${CLUSTER} --jars ${JARS} $JAR
这是这里的主要程序。只需从 [此处][1] 复制并更改输入输出集合。
package com.knx.conversion
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject
object ScalaWordCount {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/first-week.interactions")
config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/visit_06_2015.output")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val countsRDD = mongoRDD.flatMap(arg => {
val str = arg._2.get("referer").toString
str.split("h")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
countsRDD.foreach(println)
val saveRDD = countsRDD.map((tuple) => {
val bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString)
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
}
}
当 运行 我得到错误
5/07/24 15:53:03 INFO DAGScheduler: Job 0 finished: foreach at ScalaWordCount.scala:39, took 1.111442 s
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.knx.conversion.ScalaWordCount$.main(ScalaWordCount.scala:48)
at com.knx.conversion.ScalaWordCount.main(ScalaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/24 15:53:03 INFO SparkContext: Invoking stop() from shutdown hook
就是不知道为什么会这样,怎么会这样。
[1]: https://github.com/plaa/mongo-spark/blob/master/src/main/scala/ScalaWordCount.scala
此问题是关于我目前使用的Scala version
与Spark Scala版本不匹配。
我正在使用 Scala 2.11.7
编译和打包 jar 但 Spark 1.4.1
正在使用 Scala 2.10.4
.
我找到的答案here。
然后通过将 Scala
的版本切换到 2.10.4
来解决此问题。
我在读取和映射数据后将数据写入 mongo 时遇到问题。
这是我用来 运行 程序的脚本。
我正在使用 Spark 1.4.0
、Scala 2.11.7
和 mongo 2.6.10
#!/usr/bin/env bash
SPARK_PATH="/Users/username/spark-1.4.0-bin-hadoop2.6/bin/spark-submit"
CLASS_NAME="com.knx.conversion.ScalaWordCount"
CLUSTER='local[2]'
JARS="/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar,/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-java-driver-3.0.3.jar"
JAR="/Users/username/AggragateConversionFunnel/target/scala-2.11/aggragateconversionfunnel_2.11-1.0.jar"
PROJECT_PATH="/Users/username/AggragateConversionFunnel"
cd ${PROJECT_PATH} && sbt package
${SPARK_PATH} --class ${CLASS_NAME} --master ${CLUSTER} --jars ${JARS} $JAR
这是这里的主要程序。只需从 [此处][1] 复制并更改输入输出集合。
package com.knx.conversion
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject
object ScalaWordCount {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/first-week.interactions")
config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/visit_06_2015.output")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val countsRDD = mongoRDD.flatMap(arg => {
val str = arg._2.get("referer").toString
str.split("h")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
countsRDD.foreach(println)
val saveRDD = countsRDD.map((tuple) => {
val bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString)
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
}
}
当 运行 我得到错误
5/07/24 15:53:03 INFO DAGScheduler: Job 0 finished: foreach at ScalaWordCount.scala:39, took 1.111442 s
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.knx.conversion.ScalaWordCount$.main(ScalaWordCount.scala:48)
at com.knx.conversion.ScalaWordCount.main(ScalaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/24 15:53:03 INFO SparkContext: Invoking stop() from shutdown hook
就是不知道为什么会这样,怎么会这样。
[1]: https://github.com/plaa/mongo-spark/blob/master/src/main/scala/ScalaWordCount.scala
此问题是关于我目前使用的Scala version
与Spark Scala版本不匹配。
我正在使用 Scala 2.11.7
编译和打包 jar 但 Spark 1.4.1
正在使用 Scala 2.10.4
.
我找到的答案here。
然后通过将 Scala
的版本切换到 2.10.4
来解决此问题。