推特流式传输错误
Error in twitter streaming
我正在使用 Spark Streaming 编写 Twitter 连接器。
我面临以下异常
ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting
receiver with delay 2000ms: Error starting Twitter stream -
java.lang.NullPointerException
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:89)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply$mcV$sp(ReceiverSupervisor.scala:159)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply(ReceiverSupervisor.scala:152)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply(ReceiverSupervisor.scala:152)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$$anon.exec(ExecutionContextImpl.scala:107)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
下面是相关代码片段。
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("*********************")
.setOAuthConsumerSecret("**********************************************")
.setOAuthAccessToken("****************************************************")
.setOAuthAccessTokenSecret("**********************************************************")
.build
val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// ssc.checkpoint("D:/test")
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)
val hashTags = stream.map(status => status.getUser().getName())
hashTags.foreachRDD(rdd => {
rdd.foreach(println)
})
ssc.start()
ssc.awaitTermination()
谁能帮我解决这个问题?
谢谢 :)
转到抛出异常的那一行可以看到:
要使该行抛出 NPE,过滤器必须为空,这正是 TwitterStream 实例化时发生的情况:
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)
作为filter
一个序列,用Seq()
而不是null
初始化它。
我正在使用 Spark Streaming 编写 Twitter 连接器。
我面临以下异常
ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error starting Twitter stream - java.lang.NullPointerException at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:89) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply$mcV$sp(ReceiverSupervisor.scala:159) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply(ReceiverSupervisor.scala:152) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver.apply(ReceiverSupervisor.scala:152) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$$anon.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
下面是相关代码片段。
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("*********************")
.setOAuthConsumerSecret("**********************************************")
.setOAuthAccessToken("****************************************************")
.setOAuthAccessTokenSecret("**********************************************************")
.build
val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// ssc.checkpoint("D:/test")
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)
val hashTags = stream.map(status => status.getUser().getName())
hashTags.foreachRDD(rdd => {
rdd.foreach(println)
})
ssc.start()
ssc.awaitTermination()
谁能帮我解决这个问题?
谢谢 :)
转到抛出异常的那一行可以看到:
要使该行抛出 NPE,过滤器必须为空,这正是 TwitterStream 实例化时发生的情况:
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)
作为filter
一个序列,用Seq()
而不是null
初始化它。