Spark - 字数统计替代方法
Spark - Word count Alternate Approach
我有 100 万行,
除了传统的将每个单词映射到 1 然后按键减少它的方法之外,还有其他方法可以在 spark 中实现单词计数吗?
传统方法:
JavaPairRDD<String, Integer> counts = textFile.flatMap(s ->
Arrays.asList(SPACE.split(s)).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((a, b) -> a + b);
有什么新方法吗?
肯定有很多方法可以做到这一点。这里有 2 个:
一:平面图制作数据框:
JavaRDD<Row> rowRdd = spark.read()
.textFile("loremipsum.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.map(s -> RowFactory.create(s));
spark.createDataFrame(rowRdd,
new StructType()
.add(DataTypes.createStructField("word", DataTypes.StringType, true)))
.groupBy("word")
.count()
.show();
打印如下内容:
+------------+-----+
| word|count|
+------------+-----+
| Sit| 17|
| Elit| 6|
| vehicula.| 2|
| eros.| 2|
| nam.| 3|
| porttitor| 18|
|consectetur.| 6|
...
奖励:使用 SQL 进行分组(如果这算作另一种选择)
二:按单词分组并计算可迭代对象中的元素:
Map<String, Long> counts = spark.read().textFile("loremipsum.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.groupBy(i -> i)
.aggregateByKey(0L, (id, it) -> countIterable(it), (a, b) -> a + b)
.collect() //collection of Tuple2: you can stop here
.stream()
.collect(Collectors.toMap(t -> t._1, t -> t._2));
结果如下:
{=50, Malesuada=4, justo.=3, potenti=2, vel.=11, purus=30, curabitur.=2...}
countIterable
定义为:
private static <T> long countIterable(Iterable<T> it) {
long res = 0;
for (T t : it)
res += 1;
return res;
}
也可以实现为
return StreamSupport.stream(it.spliterator(), false).count();
导入所需的包,例如 "org.apache.spark.sql.functions.*"
斯卡拉:
val strDF = spark.read.text("test.txt")
strDF.select(explode(split(col("line")," ")).as("word")).groupBy(col("word")).count.show
Java:
String filePath = "/test.txt";
Dataset<Row> lines = sparkSession.read().text(filePath).toDF("line");
lines.select(explode(split(col("line")," ")).as("word")).groupBy("word").count().show();
我有 100 万行, 除了传统的将每个单词映射到 1 然后按键减少它的方法之外,还有其他方法可以在 spark 中实现单词计数吗?
传统方法:
JavaPairRDD<String, Integer> counts = textFile.flatMap(s ->
Arrays.asList(SPACE.split(s)).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((a, b) -> a + b);
有什么新方法吗?
肯定有很多方法可以做到这一点。这里有 2 个:
一:平面图制作数据框:
JavaRDD<Row> rowRdd = spark.read()
.textFile("loremipsum.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.map(s -> RowFactory.create(s));
spark.createDataFrame(rowRdd,
new StructType()
.add(DataTypes.createStructField("word", DataTypes.StringType, true)))
.groupBy("word")
.count()
.show();
打印如下内容:
+------------+-----+
| word|count|
+------------+-----+
| Sit| 17|
| Elit| 6|
| vehicula.| 2|
| eros.| 2|
| nam.| 3|
| porttitor| 18|
|consectetur.| 6|
...
奖励:使用 SQL 进行分组(如果这算作另一种选择)
二:按单词分组并计算可迭代对象中的元素:
Map<String, Long> counts = spark.read().textFile("loremipsum.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.groupBy(i -> i)
.aggregateByKey(0L, (id, it) -> countIterable(it), (a, b) -> a + b)
.collect() //collection of Tuple2: you can stop here
.stream()
.collect(Collectors.toMap(t -> t._1, t -> t._2));
结果如下:
{=50, Malesuada=4, justo.=3, potenti=2, vel.=11, purus=30, curabitur.=2...}
countIterable
定义为:
private static <T> long countIterable(Iterable<T> it) {
long res = 0;
for (T t : it)
res += 1;
return res;
}
也可以实现为
return StreamSupport.stream(it.spliterator(), false).count();
导入所需的包,例如 "org.apache.spark.sql.functions.*"
斯卡拉:
val strDF = spark.read.text("test.txt")
strDF.select(explode(split(col("line")," ")).as("word")).groupBy(col("word")).count.show
Java:
String filePath = "/test.txt";
Dataset<Row> lines = sparkSession.read().text(filePath).toDF("line");
lines.select(explode(split(col("line")," ")).as("word")).groupBy("word").count().show();