在 Spark 中创建推文流
createstream of tweets in Spark
我正在尝试使用 Scala 和 Twitter4j 在 Spark 中创建推文流。
这是我的代码片段:
object auth{
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("")
.setOAuthConsumerSecret("")
.setOAuthAccessToken("")
.setOAuthAccessTokenSecret("")
.build
}
val conf = new SparkConf().setMaster("local[2]").setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(1))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter = twitter_auth.getInstance(a).getAuthorization()
当我尝试调用 createstream 时:
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
我收到这个错误:
[error] /home/shaza90/Desktop/streaming/scala/Tutorial.scala:30: overloaded method value createStream with alternatives:
[error] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization,filters: Array[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
[error] (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
[error] cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.Authorization, Seq[String], org.apache.spark.storage.StorageLevel)
[error] val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
我不知道为什么它不匹配类型并且将我的调用视为过载,你能帮忙吗?当我尝试用 None 替换 atwitter(授权对象)时,它编译成功!!
正如您在错误日志中看到的,函数签名如下,
createStream(
ssc: org.apache.spark.streaming.StreamingContext,
twitterAuth: Option[twitter4j.auth.Authorization],
filters: Seq[String],
storageLevel: org.apache.spark.storage.StorageLevel
): org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
您正在尝试这样使用它,
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
这意味着您正在使用以下签名调用它,
createStream(
org.apache.spark.streaming.StreamingContext,
twitter4j.auth.Authorization,
Seq[String],
org.apache.spark.storage.StorageLevel
)
注意到区别了吗?它需要 Option[ twitter4j.auth.Authorization ]
而你提供它 twitter4j.auth.Authorization
。因此,您需要使用 Some
.
将 twitter4j.auth.Authorization
包装成 Option monad
val tweets = TwitterUtils.createStream( ssc, Some( atwitter ), filters, DISK_ONLY_2 )
有关选项类型的更多信息,您可以参考Nepphtyte's guide to Scala by Daniel Westheide
-> http://danielwestheide.com/blog/2012/12/19/the-neophytes-guide-to-scala-part-5-the-option-type.html,对于有Scala基础的人来说,可以说是最好的Scala资源之一。
我认为 atwitter
必须是 Option[T] 来消除调用的歧义。
您可以使用:
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
而不是
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
您也可以在通话中使用:Some(atwitter)
...如前所述。
这里有 class 这个 api 的测试:https://github.com/apache/spark/blob/master/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
我正在尝试使用 Scala 和 Twitter4j 在 Spark 中创建推文流。 这是我的代码片段:
object auth{
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("")
.setOAuthConsumerSecret("")
.setOAuthAccessToken("")
.setOAuthAccessTokenSecret("")
.build
}
val conf = new SparkConf().setMaster("local[2]").setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(1))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter = twitter_auth.getInstance(a).getAuthorization()
当我尝试调用 createstream 时:
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
我收到这个错误:
[error] /home/shaza90/Desktop/streaming/scala/Tutorial.scala:30: overloaded method value createStream with alternatives:
[error] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization,filters: Array[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
[error] (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
[error] cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.Authorization, Seq[String], org.apache.spark.storage.StorageLevel)
[error] val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
我不知道为什么它不匹配类型并且将我的调用视为过载,你能帮忙吗?当我尝试用 None 替换 atwitter(授权对象)时,它编译成功!!
正如您在错误日志中看到的,函数签名如下,
createStream(
ssc: org.apache.spark.streaming.StreamingContext,
twitterAuth: Option[twitter4j.auth.Authorization],
filters: Seq[String],
storageLevel: org.apache.spark.storage.StorageLevel
): org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
您正在尝试这样使用它,
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
这意味着您正在使用以下签名调用它,
createStream(
org.apache.spark.streaming.StreamingContext,
twitter4j.auth.Authorization,
Seq[String],
org.apache.spark.storage.StorageLevel
)
注意到区别了吗?它需要 Option[ twitter4j.auth.Authorization ]
而你提供它 twitter4j.auth.Authorization
。因此,您需要使用 Some
.
twitter4j.auth.Authorization
包装成 Option monad
val tweets = TwitterUtils.createStream( ssc, Some( atwitter ), filters, DISK_ONLY_2 )
有关选项类型的更多信息,您可以参考Nepphtyte's guide to Scala by Daniel Westheide
-> http://danielwestheide.com/blog/2012/12/19/the-neophytes-guide-to-scala-part-5-the-option-type.html,对于有Scala基础的人来说,可以说是最好的Scala资源之一。
我认为 atwitter
必须是 Option[T] 来消除调用的歧义。
您可以使用:
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
而不是
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
您也可以在通话中使用:Some(atwitter)
...如前所述。
这里有 class 这个 api 的测试:https://github.com/apache/spark/blob/master/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala