如何在 Spark Streaming 中删除其他结果?
how to drop other results in spark streaming?
我想通过仅显示我希望在 Twitter 上看到的字词来统计字数。
所以,我做了如下的线
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization
val appName = "TwitterData"
val ssc = new StreamingContext(sc, Seconds(10))
val hashTags = "XRP"
val cb = new ConfigurationBuilder
val prop = new Properties()
cb.setDebugEnabled(true).setOAuthConsumerKey("key number").setOAuthConsumerSecret("key number").setOAuthAccessToken("key number").setOAuthAccessTokenSecret("key number")
val bld = cb.build()
val tf = new TwitterFactory(bld)
val twitter = tf.getInstance()
val filters = Array(hashTags).toSeq
val auth = new OAuthAuthorization(bld)
val twitterStream = TwitterUtils.createStream(ssc, Some(auth), filters, StorageLevel.MEMORY_ONLY)
twitterStream.cache()
val lines = twitterStream.map(status => status.getText)
lines.print()
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x, 1)
} else {
}
})
pairs.print()
ssc.start()
它在 Twitter 的 Spark Streaming 上运行良好,但按照结果,我想删除所有空白,除了我想要得到的结果。
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
@RuleXRP I need 15to25 usd per xrp
RT @Grayscale: 10/27/20 UPDATE: Net Assets Under Management, Holdings per Share, and Market Price per Share for our Investment Products.
T....
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
()
()
()
()
()
()
(xrp,1)
()
()
()
...
我该怎么做?如果有任何方法可以获得我想要比我的绳索更好的唯一结果,请告诉我。我需要你的帮助。
我非常感谢你的建议。
谢谢
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x, 1)
} else {
}
})
这映射了您的结果
相反,您可以在映射之前使用过滤器,这会稍微减少您的代码:
val pairs = words
.filter(x => x == "xrp" || x == "ripple")
.map(x => (x, 1))
我想通过仅显示我希望在 Twitter 上看到的字词来统计字数。
所以,我做了如下的线
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization
val appName = "TwitterData"
val ssc = new StreamingContext(sc, Seconds(10))
val hashTags = "XRP"
val cb = new ConfigurationBuilder
val prop = new Properties()
cb.setDebugEnabled(true).setOAuthConsumerKey("key number").setOAuthConsumerSecret("key number").setOAuthAccessToken("key number").setOAuthAccessTokenSecret("key number")
val bld = cb.build()
val tf = new TwitterFactory(bld)
val twitter = tf.getInstance()
val filters = Array(hashTags).toSeq
val auth = new OAuthAuthorization(bld)
val twitterStream = TwitterUtils.createStream(ssc, Some(auth), filters, StorageLevel.MEMORY_ONLY)
twitterStream.cache()
val lines = twitterStream.map(status => status.getText)
lines.print()
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x, 1)
} else {
}
})
pairs.print()
ssc.start()
它在 Twitter 的 Spark Streaming 上运行良好,但按照结果,我想删除所有空白,除了我想要得到的结果。
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
@RuleXRP I need 15to25 usd per xrp
RT @Grayscale: 10/27/20 UPDATE: Net Assets Under Management, Holdings per Share, and Market Price per Share for our Investment Products.
T....
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
()
()
()
()
()
()
(xrp,1)
()
()
()
...
我该怎么做?如果有任何方法可以获得我想要比我的绳索更好的唯一结果,请告诉我。我需要你的帮助。 我非常感谢你的建议。 谢谢
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x, 1)
} else {
}
})
这映射了您的结果
相反,您可以在映射之前使用过滤器,这会稍微减少您的代码:
val pairs = words
.filter(x => x == "xrp" || x == "ripple")
.map(x => (x, 1))