Apache Flink:为什么 reduce 或 groupReduce 转换不能并行运行?
Apache Flink: Why do reduce or groupReduce transformations not operate in parallel?
例如:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
如果上面的reduce转换不是并行操作,我是否需要使用额外的两个转换'partitionByHash'和'mapPartition'如下:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.partitionByHash(0).mapPartition(new MapPartitionFunction()<Tuple1<Long>,Tuple1<Long>>{
public void map(Iterable<Tuple1<Long>> values,Collector<Tuple1<Long>> out){
long sum = getSum(values);
out.collect(new Tuple1(sum));
}
}).reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
以及为什么reduce转换的结果仍然是DataSet的实例而不是Tuple1<Long>
的实例
reduce
和 reduceGroup
都是 group-wise 操作,应用于记录组。如果不使用 groupBy
指定分组键,则数据集中的所有记录都属于同一组。因此,只有一个组,reduce
和reduceGroup
的最终结果不能并行计算。
如果 reduce 转换是可组合的(对于任何 ReduceFunction
和所有可组合的 GroupReduceFunction
都是如此),Flink 可以并行应用组合器。
你的两个问题的两个答案:
(1) 为什么 reduce() 不是并行的
Fabian 给了很好的解释。如果按键应用,则操作是并行的。否则只有 pre-aggregation 是平行的。
在你的第二个例子中,你通过引入一个键使其并行。除了使用 "mapPartition()" 的复杂解决方法,您还可以简单地编写 (Java 8 Style)
DataSet<Tuple1<Long>> input = ...;
input.groupBy(0).reduce( (a, b) -> new Tuple1<>(a.f0 + b.f0);
但是请注意,您的输入数据非常小,无论如何只有一个并行任务。如果你使用更大的输入,你可以看到并行pre-aggregation,例如:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
DataSet<Long> input = env.generateSequence(1, 100000000);
DataSet<Long> sum = input.reduce ( (a, b) -> a + b );
(2) 为什么reduce() 操作的结果仍然是DataSet?
DataSet 仍然是集群中 X 的惰性表示。您可以继续在并行程序中使用该数据,而无需触发某些计算并将结果数据从分布式工作程序取回驱动程序。这允许您编写更大的程序 运行 完全在分布式工作人员上并延迟执行。没有数据被提取到客户端和 re-distributed 到并行工作者。
特别是在迭代程序中,这非常强大,因为整个循环工作时无需涉及客户端,也不需要 re-deploy 操作员。
您始终可以通过调用 "dataSet.collext().get(0);" 来获取 "X" - 这明确表明应该执行和获取某些内容。
例如:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
如果上面的reduce转换不是并行操作,我是否需要使用额外的两个转换'partitionByHash'和'mapPartition'如下:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.partitionByHash(0).mapPartition(new MapPartitionFunction()<Tuple1<Long>,Tuple1<Long>>{
public void map(Iterable<Tuple1<Long>> values,Collector<Tuple1<Long>> out){
long sum = getSum(values);
out.collect(new Tuple1(sum));
}
}).reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
以及为什么reduce转换的结果仍然是DataSet的实例而不是Tuple1<Long>
reduce
和 reduceGroup
都是 group-wise 操作,应用于记录组。如果不使用 groupBy
指定分组键,则数据集中的所有记录都属于同一组。因此,只有一个组,reduce
和reduceGroup
的最终结果不能并行计算。
如果 reduce 转换是可组合的(对于任何 ReduceFunction
和所有可组合的 GroupReduceFunction
都是如此),Flink 可以并行应用组合器。
你的两个问题的两个答案:
(1) 为什么 reduce() 不是并行的
Fabian 给了很好的解释。如果按键应用,则操作是并行的。否则只有 pre-aggregation 是平行的。
在你的第二个例子中,你通过引入一个键使其并行。除了使用 "mapPartition()" 的复杂解决方法,您还可以简单地编写 (Java 8 Style)
DataSet<Tuple1<Long>> input = ...;
input.groupBy(0).reduce( (a, b) -> new Tuple1<>(a.f0 + b.f0);
但是请注意,您的输入数据非常小,无论如何只有一个并行任务。如果你使用更大的输入,你可以看到并行pre-aggregation,例如:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
DataSet<Long> input = env.generateSequence(1, 100000000);
DataSet<Long> sum = input.reduce ( (a, b) -> a + b );
(2) 为什么reduce() 操作的结果仍然是DataSet?
DataSet 仍然是集群中 X 的惰性表示。您可以继续在并行程序中使用该数据,而无需触发某些计算并将结果数据从分布式工作程序取回驱动程序。这允许您编写更大的程序 运行 完全在分布式工作人员上并延迟执行。没有数据被提取到客户端和 re-distributed 到并行工作者。
特别是在迭代程序中,这非常强大,因为整个循环工作时无需涉及客户端,也不需要 re-deploy 操作员。
您始终可以通过调用 "dataSet.collext().get(0);" 来获取 "X" - 这明确表明应该执行和获取某些内容。