Spark Analysis Reduce(Twitter 情绪)
Spark Analysis Reduce (Twitter Sentiment)
我有一个关于 Apache Spark 和 Java
的问题
我正在制作一个从 Twitter (Twitter4J) 流式传输数据的应用程序。我也在制作一个分析数据的应用程序。包含 JSON 条推文的 txt 文件。
StreamingApp:
输出tweet.txt:
示例:一行 Json:
{"id":674534622903054336,"user":"twitter","tweet":"a tweet from twitter #twitter.","date":"2015-12-09T11:22:41CET"}
AnalyzerApp:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TwitterAnalyzerBigData");
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> jsonFile = sc.textFile("whateverpath/tweets.txt");
JavaPairRDD<Long, String> tweetsFiltered = jsonFile.mapToPair(new TwitterFilterFunction());
tweetsFiltered 是一个 JavaPairRDD: tweet ID : Long 和 tweet: String
现在我正在使用一些地图函数来得到这样的东西:
(1,a tweet from twitter #twitter.,0.0,0.055555556,negative, TWITTER)
(此为随机测试数据)
- 1 是 ID
- 推特推文#twitter:推文
- 0.0:正面得分
- 0.0566:负分
- 负面:类别情绪(正面或负面)
- TWITTER:推文类别(基于主题标签的类别)
问题: 我怎样才能减少这个 RDD,所以我得到这样的结果:
TWITTER, 1, 0
- TWITTER:推文的类别
- 1 : TWITTER CATEGORY
的推文总数
- 0 : TWITTER CATEGORY
的正面推文数量
在 James 的回答后,我在 Java 中做了 reduceByKey。
JavaRDD<Tuple3<String, Float, Float>> categoryEntryRDD = categoryResult.map(new Function<Tuple4<Long, String, String, String>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
if(tuple4._3().equals("positive")){
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 1F);
} else {
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 0F);
}
}
});
Tuple3<String, Float, Float> reducedRDD = categoryEntryRDD.reduce(new Function2<Tuple3<String, Float, Float>, Tuple3<String, Float, Float>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple3<String, Float, Float> tuple31, Tuple3<String, Float, Float> tuple32) throws Exception {
System.out.println(tuple31.toString());
return new Tuple3<String, Float, Float>(tuple31._1(), tuple31._2()+tuple32._2(), tuple31._3()+tuple32._3());
}
});
但 reduce 方法与 reduceByKey 不同,我该如何解决?
我的输出:
{推特, 1000, 400}
但我也有一个类别:有 1000 条推文的 FACEBOOK。
这是一个很好的典型 map-reduce 问题:
- 将推文条目映射到表示类别和计数 1 的元组
- 减少类别元组以总结每个类别的数量
即伪代码:
+ map the RDD you have (id, tweet, pos score...
- map to a tuple that looks like (category, 1, 1) if the tweet is positive
- map to a tuple that looks like (category, 1, 0) if the tweet is negative
+ reduceByKey where our key is the category using summation
- we end up with an RDD of tuples in the form you want
这里有一些 scala 代码来完成这个——java 是类似的
val categoryEntryRDD = tweetsFiltered.map( mappedTuple =>
if mappedTuple._5 == "positive" {
(mappedTuple._6, 1, 1)
} else {
(mappedTyple._6, 1, 0)
}
}
val reducedRDD = categoryEntryRDD.reduceByKey( x, y => (x._1 + y._1, x._2 + y._2) )
此时 reducedRDD 保存的元组看起来像(类别,该类别的推文总数,该类别的正面推文总数)。
我终于明白了!随着 Java
JavaPairRDD<String, Float> categoryPositiveTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
@Override
public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
if(tuple4._3().equals("positive")){
return new Tuple2<String, Float>(tuple4._4(), 1F);
} else {
return new Tuple2<String, Float>(tuple4._4(), 0F);
}
}
}).reduceByKey(new Function2<Float, Float, Float>() {
@Override
public Float call(Float aFloat, Float aFloat2) throws Exception {
return aFloat+aFloat2;
}
});
JavaPairRDD<String, Float> categoryTotalTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
@Override
public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
return new Tuple2<String, Float>(tuple4._4(), 1F);
}
}).reduceByKey(new Function2<Float, Float, Float>() {
@Override
public Float call(Float aFloat, Float aFloat2) throws Exception {
return aFloat+aFloat2;
}
});
JavaPairRDD<String, Tuple2<Float, Float>> joinedCategorizedTweets = categoryTotalTweets.join(categoryPositiveTweets);
JavaRDD<Tuple3<String, Float, Float>> categorizedScoredTweets = joinedCategorizedTweets.map(new Function<Tuple2<String, Tuple2<Float, Float>>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple2<String, Tuple2<Float, Float>> tweet) throws Exception {
return new Tuple3<String, Float, Float>(
tweet._1(),
tweet._2()._1(),
tweet._2()._2());
}
});
感谢帮助!
结果:
(推特, 100, 40)
(脸书, 80, 20)
我有一个关于 Apache Spark 和 Java
的问题我正在制作一个从 Twitter (Twitter4J) 流式传输数据的应用程序。我也在制作一个分析数据的应用程序。包含 JSON 条推文的 txt 文件。
StreamingApp: 输出tweet.txt: 示例:一行 Json:
{"id":674534622903054336,"user":"twitter","tweet":"a tweet from twitter #twitter.","date":"2015-12-09T11:22:41CET"}
AnalyzerApp:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TwitterAnalyzerBigData");
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> jsonFile = sc.textFile("whateverpath/tweets.txt");
JavaPairRDD<Long, String> tweetsFiltered = jsonFile.mapToPair(new TwitterFilterFunction());
tweetsFiltered 是一个 JavaPairRDD: tweet ID : Long 和 tweet: String
现在我正在使用一些地图函数来得到这样的东西:
(1,a tweet from twitter #twitter.,0.0,0.055555556,negative, TWITTER)
(此为随机测试数据)
- 1 是 ID
- 推特推文#twitter:推文
- 0.0:正面得分
- 0.0566:负分
- 负面:类别情绪(正面或负面)
- TWITTER:推文类别(基于主题标签的类别)
问题: 我怎样才能减少这个 RDD,所以我得到这样的结果:
TWITTER, 1, 0
- TWITTER:推文的类别
- 1 : TWITTER CATEGORY 的推文总数
- 0 : TWITTER CATEGORY 的正面推文数量
在 James 的回答后,我在 Java 中做了 reduceByKey。
JavaRDD<Tuple3<String, Float, Float>> categoryEntryRDD = categoryResult.map(new Function<Tuple4<Long, String, String, String>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
if(tuple4._3().equals("positive")){
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 1F);
} else {
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 0F);
}
}
});
Tuple3<String, Float, Float> reducedRDD = categoryEntryRDD.reduce(new Function2<Tuple3<String, Float, Float>, Tuple3<String, Float, Float>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple3<String, Float, Float> tuple31, Tuple3<String, Float, Float> tuple32) throws Exception {
System.out.println(tuple31.toString());
return new Tuple3<String, Float, Float>(tuple31._1(), tuple31._2()+tuple32._2(), tuple31._3()+tuple32._3());
}
});
但 reduce 方法与 reduceByKey 不同,我该如何解决?
我的输出: {推特, 1000, 400} 但我也有一个类别:有 1000 条推文的 FACEBOOK。
这是一个很好的典型 map-reduce 问题:
- 将推文条目映射到表示类别和计数 1 的元组
- 减少类别元组以总结每个类别的数量
即伪代码:
+ map the RDD you have (id, tweet, pos score...
- map to a tuple that looks like (category, 1, 1) if the tweet is positive
- map to a tuple that looks like (category, 1, 0) if the tweet is negative
+ reduceByKey where our key is the category using summation
- we end up with an RDD of tuples in the form you want
这里有一些 scala 代码来完成这个——java 是类似的
val categoryEntryRDD = tweetsFiltered.map( mappedTuple =>
if mappedTuple._5 == "positive" {
(mappedTuple._6, 1, 1)
} else {
(mappedTyple._6, 1, 0)
}
}
val reducedRDD = categoryEntryRDD.reduceByKey( x, y => (x._1 + y._1, x._2 + y._2) )
此时 reducedRDD 保存的元组看起来像(类别,该类别的推文总数,该类别的正面推文总数)。
我终于明白了!随着 Java
JavaPairRDD<String, Float> categoryPositiveTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
@Override
public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
if(tuple4._3().equals("positive")){
return new Tuple2<String, Float>(tuple4._4(), 1F);
} else {
return new Tuple2<String, Float>(tuple4._4(), 0F);
}
}
}).reduceByKey(new Function2<Float, Float, Float>() {
@Override
public Float call(Float aFloat, Float aFloat2) throws Exception {
return aFloat+aFloat2;
}
});
JavaPairRDD<String, Float> categoryTotalTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
@Override
public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
return new Tuple2<String, Float>(tuple4._4(), 1F);
}
}).reduceByKey(new Function2<Float, Float, Float>() {
@Override
public Float call(Float aFloat, Float aFloat2) throws Exception {
return aFloat+aFloat2;
}
});
JavaPairRDD<String, Tuple2<Float, Float>> joinedCategorizedTweets = categoryTotalTweets.join(categoryPositiveTweets);
JavaRDD<Tuple3<String, Float, Float>> categorizedScoredTweets = joinedCategorizedTweets.map(new Function<Tuple2<String, Tuple2<Float, Float>>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple2<String, Tuple2<Float, Float>> tweet) throws Exception {
return new Tuple3<String, Float, Float>(
tweet._1(),
tweet._2()._1(),
tweet._2()._2());
}
});
感谢帮助!
结果:
(推特, 100, 40) (脸书, 80, 20)