以下几种在 Apache Flink 中进行字数统计的方法有什么区别?
What is the difference between the follow ways to do word count in Apache Flink?
Apache Flink 提供了很多对DataSet 的操作。有点难以理解数据在集群中是如何处理的。例如 WordCount 有不同的实现。有什么区别?
如果有一些文档可以解释集群中这些工具的数据流是什么,那将是非常有帮助的。
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
// WordCount 1
text.flatMap(new LineSplitter()).groupBy(0).sum(1).print();
// WordCount 2
text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1).print();
// WordCount 3
text.flatMap(new LineSplitter()).groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<String, Integer>(t1.f0, t1.f1+t2.f1);
}
}).print();
// WordCount 4
text.flatMap(new LineSplitter()).groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
int prefixSum = 0;
String key = null;
for (Tuple2<String, Integer> t : iterable) {
prefixSum += t.f1;
key = t.f0;
}
collector.collect(new Tuple2<String, Integer>(key, prefixSum));
}
}).print();
// WordCount 5
text.flatMap(new LineSplitter())
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
HashMap<String, Integer> map = new HashMap<String, Integer>();
for(Tuple2<String, Integer> t : iterable){
if(map.containsKey(t.f0)){
map.replace(t.f0, map.get(t.f0)+t.f1);
} else {
map.put(t.f0, t.f1);
}
}
for(Map.Entry<String, Integer> pair : map.entrySet()){
collector.collect(new Tuple2<String, Integer>(pair.getKey(), pair.getValue()));
}
}
}).print();
除 WordCount 5 外,所有程序的执行都与常规 MapReduce WordCount 程序非常相似(基于哈希的随机播放和基于排序的分组)。
- WordCount 1 是 WordCount 2 的语法糖
- WordCount 2 在内部使用
GroupReduceFunction
执行,与 WordCount 4 中的类似。唯一的区别是内部 GroupReduceFunction
实现了 Combinable
接口,以便支持部分聚合。
- WordCount 3 使用类似于
GroupReduceFunction
执行的 ReduceFunction
。但是,由于不同的接口,ReduceFunction
始终是可组合的(不需要单独的 combine
方法)。
- WordCount 4 的执行就像常规的 MapReduce 程序一样:使用散列分区和基于排序的分组进行随机播放。因为
GroupReduceFunction
没有实现Combinable
接口,所以这个程序是在没有本地预聚合的情况下执行的,因此效率低于前三个程序。
- WordCount 5 效率很低,不应使用,因为
GroupReduceFunction
不能并行执行。由于没有 groupBy()
调用,所有数据都发送到同一个 Reducer 并作为一个大组处理。首先,这会很慢,因为它是在单线程中执行的,并且受到单机网络吞吐量的限制。其次,如果不同键的数量增长太大,这个程序很容易失败,因为分组是使用内存中的 HashMap
完成的。
Apache Flink 提供了很多对DataSet 的操作。有点难以理解数据在集群中是如何处理的。例如 WordCount 有不同的实现。有什么区别?
如果有一些文档可以解释集群中这些工具的数据流是什么,那将是非常有帮助的。
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
// WordCount 1
text.flatMap(new LineSplitter()).groupBy(0).sum(1).print();
// WordCount 2
text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1).print();
// WordCount 3
text.flatMap(new LineSplitter()).groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<String, Integer>(t1.f0, t1.f1+t2.f1);
}
}).print();
// WordCount 4
text.flatMap(new LineSplitter()).groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
int prefixSum = 0;
String key = null;
for (Tuple2<String, Integer> t : iterable) {
prefixSum += t.f1;
key = t.f0;
}
collector.collect(new Tuple2<String, Integer>(key, prefixSum));
}
}).print();
// WordCount 5
text.flatMap(new LineSplitter())
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
HashMap<String, Integer> map = new HashMap<String, Integer>();
for(Tuple2<String, Integer> t : iterable){
if(map.containsKey(t.f0)){
map.replace(t.f0, map.get(t.f0)+t.f1);
} else {
map.put(t.f0, t.f1);
}
}
for(Map.Entry<String, Integer> pair : map.entrySet()){
collector.collect(new Tuple2<String, Integer>(pair.getKey(), pair.getValue()));
}
}
}).print();
除 WordCount 5 外,所有程序的执行都与常规 MapReduce WordCount 程序非常相似(基于哈希的随机播放和基于排序的分组)。
- WordCount 1 是 WordCount 2 的语法糖
- WordCount 2 在内部使用
GroupReduceFunction
执行,与 WordCount 4 中的类似。唯一的区别是内部GroupReduceFunction
实现了Combinable
接口,以便支持部分聚合。 - WordCount 3 使用类似于
GroupReduceFunction
执行的ReduceFunction
。但是,由于不同的接口,ReduceFunction
始终是可组合的(不需要单独的combine
方法)。 - WordCount 4 的执行就像常规的 MapReduce 程序一样:使用散列分区和基于排序的分组进行随机播放。因为
GroupReduceFunction
没有实现Combinable
接口,所以这个程序是在没有本地预聚合的情况下执行的,因此效率低于前三个程序。 - WordCount 5 效率很低,不应使用,因为
GroupReduceFunction
不能并行执行。由于没有groupBy()
调用,所有数据都发送到同一个 Reducer 并作为一个大组处理。首先,这会很慢,因为它是在单线程中执行的,并且受到单机网络吞吐量的限制。其次,如果不同键的数量增长太大,这个程序很容易失败,因为分组是使用内存中的HashMap
完成的。