Spark Analysis Reduce(Twitter 情绪)

Spark Analysis Reduce (Twitter Sentiment)

我有一个关于 Apache Spark 和 Java

的问题

我正在制作一个从 Twitter (Twitter4J) 流式传输数据的应用程序。我也在制作一个分析数据的应用程序。包含 JSON 条推文的 txt 文件。

StreamingApp: 输出tweet.txt: 示例:一行 Json:

{"id":674534622903054336,"user":"twitter","tweet":"a tweet from twitter #twitter.","date":"2015-12-09T11:22:41CET"}

AnalyzerApp:

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TwitterAnalyzerBigData");
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> jsonFile = sc.textFile("whateverpath/tweets.txt");
JavaPairRDD<Long, String> tweetsFiltered = jsonFile.mapToPair(new TwitterFilterFunction());

tweetsFiltered 是一个 JavaPairRDD: tweet ID : Longtweet: String

现在我正在使用一些地图函数来得到这样的东西:

(1,a tweet from twitter #twitter.,0.0,0.055555556,negative, TWITTER)

(此为随机测试数据)

问题: 我怎样才能减少这个 RDD,所以我得到这样的结果:

TWITTER, 1, 0

在 James 的回答后,我在 Java 中做了 reduceByKey。

JavaRDD<Tuple3<String, Float, Float>> categoryEntryRDD = categoryResult.map(new Function<Tuple4<Long, String, String, String>, Tuple3<String, Float, Float>>() {
            @Override
            public Tuple3<String, Float, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
                if(tuple4._3().equals("positive")){
                    return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 1F);
                } else {
                    return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 0F);
                }

            }
        });


    Tuple3<String, Float, Float> reducedRDD = categoryEntryRDD.reduce(new Function2<Tuple3<String, Float, Float>, Tuple3<String, Float, Float>, Tuple3<String, Float, Float>>() {
        @Override
        public Tuple3<String, Float, Float> call(Tuple3<String, Float, Float> tuple31, Tuple3<String, Float, Float> tuple32) throws Exception {
            System.out.println(tuple31.toString());

            return new Tuple3<String, Float, Float>(tuple31._1(), tuple31._2()+tuple32._2(), tuple31._3()+tuple32._3());
        }
    });

但 reduce 方法与 reduceByKey 不同,我该如何解决?

我的输出: {推特, 1000, 400} 但我也有一个类别:有 1000 条推文的 FACEBOOK。

这是一个很好的典型 map-reduce 问题:

  1. 将推文条目映射到表示类别和计数 1 的元组
  2. 减少类别元组以总结每个类别的数量

即伪代码:

+ map the RDD you have (id, tweet, pos score...
- map to a tuple that looks like (category, 1, 1) if the tweet is positive
- map to a tuple that looks like (category, 1, 0) if the tweet is negative

+ reduceByKey where our key is the category using summation
- we end up with an RDD of tuples in the form you want

这里有一些 scala 代码来完成这个——java 是类似的

val categoryEntryRDD = tweetsFiltered.map( mappedTuple =>
    if mappedTuple._5 == "positive" {
        (mappedTuple._6, 1, 1)
    } else {
        (mappedTyple._6, 1, 0)
    }
}

val reducedRDD = categoryEntryRDD.reduceByKey( x, y => (x._1 + y._1, x._2 + y._2) )

此时 reducedRDD 保存的元组看起来像(类别,该类别的推文总数,该类别的正面推文总数)。

我终于明白了!随着 Java

JavaPairRDD<String, Float> categoryPositiveTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
        @Override
        public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
            if(tuple4._3().equals("positive")){
                return new Tuple2<String, Float>(tuple4._4(), 1F);

            } else {
                return new Tuple2<String, Float>(tuple4._4(), 0F);
            }
        }
    }).reduceByKey(new Function2<Float, Float, Float>() {
        @Override
        public Float call(Float aFloat, Float aFloat2) throws Exception {
            return aFloat+aFloat2;
        }
    });

    JavaPairRDD<String, Float> categoryTotalTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() {
        @Override
        public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
            return new Tuple2<String, Float>(tuple4._4(), 1F);
        }
    }).reduceByKey(new Function2<Float, Float, Float>() {
        @Override
        public Float call(Float aFloat, Float aFloat2) throws Exception {
            return aFloat+aFloat2;
        }
    });

    JavaPairRDD<String, Tuple2<Float, Float>> joinedCategorizedTweets = categoryTotalTweets.join(categoryPositiveTweets);

    JavaRDD<Tuple3<String, Float, Float>> categorizedScoredTweets = joinedCategorizedTweets.map(new Function<Tuple2<String, Tuple2<Float, Float>>, Tuple3<String, Float, Float>>() {
        @Override
        public Tuple3<String, Float, Float> call(Tuple2<String, Tuple2<Float, Float>> tweet) throws Exception {
            return new Tuple3<String, Float, Float>(
                    tweet._1(),
                    tweet._2()._1(),
                    tweet._2()._2());
        }
    });

感谢帮助!

结果:

(推特, 100, 40) (脸书, 80, 20)