对 JavaDStream 进行排序 - Spark Streaming
Sorting a JavaDStream - Spark Streaming
我有一个使用 JavaDStreams
对象的应用程序。
这是一段代码,我在其中计算单词出现的频率。
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
现在,如果我想打印前 N 个频繁出现的元素,按整数值排序,如果没有像 sortByKey(对于 JavaPairRDD)这样的方法,我该怎么办?
因为你有 JavaPairDStream<String, Integer>
并且想按整数值排序,你必须先交换对。
JavaPairDStream<Integer,String> swappedPair = wordCounts.mapToPair(x -> x.swap());
现在您可以使用 transformToPair
排序并使用 sortByKey
函数。
JavaPairDStream<Integer,String> sortedStream = swappedPair.transformToPair(
new Function<JavaPairRDD<Integer,String>, JavaPairRDD<Integer,String>>() {
@Override
public JavaPairRDD<Integer,String> call(JavaPairRDD<Integer,String> jPairRDD) throws Exception {
return jPairRDD.sortByKey(false);
}
});
sortedStream.print();
简化:
JavaPairDStream<String, Long> counts = lines.countByValue();
JavaPairDStream<Long,String> swappedPair = counts.mapToPair(Tuple2::swap);
JavaPairDStream<Long,String> sortedStream = swappedPair.transformToPair(s -> s.sortByKey(false));
我有一个使用 JavaDStreams
对象的应用程序。
这是一段代码,我在其中计算单词出现的频率。
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
现在,如果我想打印前 N 个频繁出现的元素,按整数值排序,如果没有像 sortByKey(对于 JavaPairRDD)这样的方法,我该怎么办?
因为你有 JavaPairDStream<String, Integer>
并且想按整数值排序,你必须先交换对。
JavaPairDStream<Integer,String> swappedPair = wordCounts.mapToPair(x -> x.swap());
现在您可以使用 transformToPair
排序并使用 sortByKey
函数。
JavaPairDStream<Integer,String> sortedStream = swappedPair.transformToPair(
new Function<JavaPairRDD<Integer,String>, JavaPairRDD<Integer,String>>() {
@Override
public JavaPairRDD<Integer,String> call(JavaPairRDD<Integer,String> jPairRDD) throws Exception {
return jPairRDD.sortByKey(false);
}
});
sortedStream.print();
简化:
JavaPairDStream<String, Long> counts = lines.countByValue();
JavaPairDStream<Long,String> swappedPair = counts.mapToPair(Tuple2::swap);
JavaPairDStream<Long,String> sortedStream = swappedPair.transformToPair(s -> s.sortByKey(false));