谁能解释分组数据集上 运行 combineGroup 和 reduceGroup 转换之间的不同行为?
Any one who can explain the different behavior between run combineGroup and reduceGroup transformation on a grouped dataset?
谁能解释分组数据集上 combineGroup 和 reduceGroup 转换之间的不同行为运行?
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = executionEnvironment.fromElements(1,2,2,3,4,5,6,7,8,9);
input.map(new MapFunction<Integer, Tuple1<Integer>>() {
public Tuple1<Integer> map(Integer value) throws Exception {
return new Tuple1<Integer>(value
);
}
}).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple1<Integer>, Integer>() {
public void reduce(Iterable<Tuple1<Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Tuple1<Integer> value : values) {
sum += value.f0;
}
out.collect(sum);
}
}).print();
分组数据集上的运行s reduceGroup 转换,控制台显示为:
4
6个
4个
1个
7
9
3个
5个
8
input.map(new MapFunction<Integer, Tuple1<Integer>>() {
public Tuple1<Integer> map(Integer value) throws Exception {
return new Tuple1<Integer>(value);
}
}).groupBy(0).combineGroup(new GroupCombineFunction<Tuple1<Integer>, Integer>() {
public void combine(Iterable<Tuple1<Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Tuple1<Integer> value : values) {
sum += value.f0;
}
out.collect(sum);
}
}).print();
分组数据集上的运行s combineGroup,控制台显示为:
4
8个
1个
5个
9
4个
6个
3个
7
很明显,我看不出这个例子有什么不同。
两者计算相同的东西。因此,对于这两种情况,您都会得到相同的结果。差异有些微妙。基本上,"The GroupCombine transformation is the generalized form of the combine step in the Combinable GroupReduceFunction".
阅读文档以获得详细解释,如果还有不明白的地方再问:
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#reduce-on-grouped-dataset
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#groupreduce-on-grouped-dataset
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#groupcombine-on-a-grouped-dataset
谁能解释分组数据集上 combineGroup 和 reduceGroup 转换之间的不同行为运行?
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = executionEnvironment.fromElements(1,2,2,3,4,5,6,7,8,9);
input.map(new MapFunction<Integer, Tuple1<Integer>>() {
public Tuple1<Integer> map(Integer value) throws Exception {
return new Tuple1<Integer>(value
);
}
}).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple1<Integer>, Integer>() {
public void reduce(Iterable<Tuple1<Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Tuple1<Integer> value : values) {
sum += value.f0;
}
out.collect(sum);
}
}).print();
分组数据集上的运行s reduceGroup 转换,控制台显示为:
4 6个 4个 1个 7 9 3个 5个 8
input.map(new MapFunction<Integer, Tuple1<Integer>>() {
public Tuple1<Integer> map(Integer value) throws Exception {
return new Tuple1<Integer>(value);
}
}).groupBy(0).combineGroup(new GroupCombineFunction<Tuple1<Integer>, Integer>() {
public void combine(Iterable<Tuple1<Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Tuple1<Integer> value : values) {
sum += value.f0;
}
out.collect(sum);
}
}).print();
分组数据集上的运行s combineGroup,控制台显示为:
4 8个 1个 5个 9 4个 6个 3个 7
很明显,我看不出这个例子有什么不同。
两者计算相同的东西。因此,对于这两种情况,您都会得到相同的结果。差异有些微妙。基本上,"The GroupCombine transformation is the generalized form of the combine step in the Combinable GroupReduceFunction".
阅读文档以获得详细解释,如果还有不明白的地方再问:
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#reduce-on-grouped-dataset
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#groupreduce-on-grouped-dataset
- https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/dataset_transformations.html#groupcombine-on-a-grouped-dataset