如何使用 Spark 从多个文档中获取术语文档矩阵?

How to get term-document matrix from multiple documents with Spark?

我正在尝试从多个文档中生成术语文档矩阵。我可以从已经创建的矩阵 运行 LDA 模型,现在我需要退后一步。 我试图实现一个简单的术语文档矩阵,但现在我被卡住了。我所做的是:

//GETS ALL FILES FROM INPUT PATH
JavaPairRDD<String, String> doc_words = context.wholeTextFiles(input_path);

//SPLIT BY " "
JavaPairRDD<String, String> tokenized = doc_words.flatMapValues(Preprocessing_DocumentTermMatrix.WORDS_EXTRACTOR);

//SEE METHOD WORDS_MAPPER.
JavaRDD<Tuple2<Tuple2<String, String>, Integer>> rdd = tokenized.flatMap(WORDS_MAPPER);


//METHOD WORDS_MAPPER
public static final FlatMapFunction<Tuple2<String, String>, Tuple2<Tuple2<String, String>, Integer>> WORDS_MAPPER = new FlatMapFunction<Tuple2<String, String>, Tuple2<Tuple2<String, String>, Integer>>() {

    public Iterable<Tuple2<Tuple2<String, String>, Integer>> call(Tuple2<String, String> stringIntegerTuple2) throws Exception {
        return Arrays.asList(new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<String,String>(stringIntegerTuple2._1(), stringIntegerTuple2._2()), 1)); 
    } 
};

所以,这个函数给我这样的结果:

((DOC_0, TERM0), 1)
((DOC_0, TERM0), 1)
((DOC_0, TERM1), 1)
((DOC_1, TERM0), 1)
((DOC_1, TERM2), 1)

我想这没问题,但现在我需要减少它并提取这样的输出:

(DOC_0, (TERM0, 2), (TERM1, 1))
(DOC_1, (TERM0, 1), (TERM2, 1))

我试了很多东西都搞不定...有人能帮帮我吗?

这是解决方案:

JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> newrdd = JavaPairRDD.fromJavaRDD(rdd).reduceByKey((a, b) -> a + b)
                .mapToPair(t -> new Tuple2<>(t._1._1, new Tuple2<>(t._1._2, t._2))).groupByKey();