SparkException:默认分区程序无法分区数组键
SparkException: Default partitioner cannot partition array keys
我使用 spark streaming 编写了以下程序
object TrendingHashTags {
def main(args: Array[String]) : Unit = {
val url = getClass.getResource("/twitterapi.properties")
val source = Source.fromURL(url)
val props = new Properties()
props.load(source.bufferedReader())
System.setProperty("twitter4j.oauth.consumerKey", props.get("consumer_key").toString)
System.setProperty("twitter4j.oauth.consumerSecret", props.get("consumer_secret").toString)
System.setProperty("twitter4j.oauth.accessToken", props.get("access_token").toString)
System.setProperty("twitter4j.oauth.accessTokenSecret", props.get("access_token_secret").toString)
val conf = new SparkConf().setAppName("Abhishek Spark Streaming")
val ssc = new StreamingContext(conf, Seconds(30))
ssc.checkpoint("checkpoint")
val tweets = TwitterUtils.createStream(ssc, None)
val tweetsByLang = tweets.filter(tweet => tweet.getLang == "en-US")
val tweetText = tweetsByLang.map(t => t.getText)
val words = tweetText.map(t => t.split("\s+"))
val hashTags = words.filter(w => w.startsWith("#")).map(h => (h, 1))
val tagsWithCounts = hashTags.updateStateByKey{
(counts : Seq[Int], prevCount : Option[Int]) => prevCount.map{c => c + counts.sum}.orElse{Some(counts.sum)}
}
val topHashTags = tagsWithCounts.filter{
case (t, c) => c > 10
}
val sortedTopHashTags = topHashTags.transform{
rdd => rdd.sortBy({
case (w, c) => c
}, false)
}
sortedTopHashTags.print(10)
ssc.start()
ssc.awaitTermination()
}
}
但是当我 运行 这个程序使用 spark-submit spark-submit --class com.abhi.TrendingHashTags --master yarn Foo.jar
我收到错误
16/05/12 04:18:00 ERROR scheduler.JobScheduler: Error generating jobs for time 1463026680000 ms
org.apache.spark.SparkException: Default partitioner cannot partition array keys.
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag.apply(PairRDDFunctions.scala:89)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag.apply(PairRDDFunctions.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:82)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey.apply(PairRDDFunctions.scala:506)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey.apply(PairRDDFunctions.scala:499)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:499)
at org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:105)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
有谁知道出了什么问题吗?
例外情况很明显,数组在基于键的操作中用作键。
代码表明这不是本意。
根据 DStream 的类型,我们可以看到哪里出了问题:
val tweets: DStream[Status] = TwitterUtils.createStream(ssc, None)
val tweetsByLang: DStream[Status] = tweets.filter(tweet => tweet.getLang == "en-US")
val tweetText:DStream[String] = tweetsByLang.map(t => t.getText)
val words[Array[String]] = tweetText.map(t => t.split("\s+")) // problem here
我想我们想要的是一个 DStream 的单词来进行经典的单词计数。修复:
val words[String] = tweetText.flatMap(t => t.split("\s+")) // fixed
我使用 spark streaming 编写了以下程序
object TrendingHashTags {
def main(args: Array[String]) : Unit = {
val url = getClass.getResource("/twitterapi.properties")
val source = Source.fromURL(url)
val props = new Properties()
props.load(source.bufferedReader())
System.setProperty("twitter4j.oauth.consumerKey", props.get("consumer_key").toString)
System.setProperty("twitter4j.oauth.consumerSecret", props.get("consumer_secret").toString)
System.setProperty("twitter4j.oauth.accessToken", props.get("access_token").toString)
System.setProperty("twitter4j.oauth.accessTokenSecret", props.get("access_token_secret").toString)
val conf = new SparkConf().setAppName("Abhishek Spark Streaming")
val ssc = new StreamingContext(conf, Seconds(30))
ssc.checkpoint("checkpoint")
val tweets = TwitterUtils.createStream(ssc, None)
val tweetsByLang = tweets.filter(tweet => tweet.getLang == "en-US")
val tweetText = tweetsByLang.map(t => t.getText)
val words = tweetText.map(t => t.split("\s+"))
val hashTags = words.filter(w => w.startsWith("#")).map(h => (h, 1))
val tagsWithCounts = hashTags.updateStateByKey{
(counts : Seq[Int], prevCount : Option[Int]) => prevCount.map{c => c + counts.sum}.orElse{Some(counts.sum)}
}
val topHashTags = tagsWithCounts.filter{
case (t, c) => c > 10
}
val sortedTopHashTags = topHashTags.transform{
rdd => rdd.sortBy({
case (w, c) => c
}, false)
}
sortedTopHashTags.print(10)
ssc.start()
ssc.awaitTermination()
}
}
但是当我 运行 这个程序使用 spark-submit spark-submit --class com.abhi.TrendingHashTags --master yarn Foo.jar
我收到错误
16/05/12 04:18:00 ERROR scheduler.JobScheduler: Error generating jobs for time 1463026680000 ms
org.apache.spark.SparkException: Default partitioner cannot partition array keys.
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag.apply(PairRDDFunctions.scala:89)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag.apply(PairRDDFunctions.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:82)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey.apply(PairRDDFunctions.scala:506)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey.apply(PairRDDFunctions.scala:499)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:499)
at org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:105)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
有谁知道出了什么问题吗?
例外情况很明显,数组在基于键的操作中用作键。
代码表明这不是本意。 根据 DStream 的类型,我们可以看到哪里出了问题:
val tweets: DStream[Status] = TwitterUtils.createStream(ssc, None)
val tweetsByLang: DStream[Status] = tweets.filter(tweet => tweet.getLang == "en-US")
val tweetText:DStream[String] = tweetsByLang.map(t => t.getText)
val words[Array[String]] = tweetText.map(t => t.split("\s+")) // problem here
我想我们想要的是一个 DStream 的单词来进行经典的单词计数。修复:
val words[String] = tweetText.flatMap(t => t.split("\s+")) // fixed