Apache Flink:为什么 reduce 或 groupReduce 转换不能并行运行?

Apache Flink: Why do reduce or groupReduce transformations not operate in parallel?

例如:

DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
   public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
      return new Tuple1<>(value1.f0 + value2.f0);
   }
}

如果上面的reduce转换不是并行操作,我是否需要使用额外的两个转换'partitionByHash'和'mapPartition'如下:

DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.partitionByHash(0).mapPartition(new MapPartitionFunction()<Tuple1<Long>,Tuple1<Long>>{
   public void map(Iterable<Tuple1<Long>> values,Collector<Tuple1<Long>> out){
      long sum = getSum(values);
      out.collect(new Tuple1(sum));
   }
}).reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
   public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
      return new Tuple1<>(value1.f0 + value2.f0);
   }
}

以及为什么reduce转换的结果仍然是DataSet的实例而不是Tuple1<Long>

的实例

reducereduceGroup 都是 group-wise 操作,应用于记录组。如果不使用 groupBy 指定分组键,则数据集中的所有记录都属于同一组。因此,只有一个组,reducereduceGroup的最终结果不能并行计算。

如果 reduce 转换是可组合的(对于任何 ReduceFunction 和所有可组合的 GroupReduceFunction 都是如此),Flink 可以并行应用组合器。

你的两个问题的两个答案:

(1) 为什么 reduce() 不是并行的

Fabian 给了很好的解释。如果按键应用,则操作是并行的。否则只有 pre-aggregation 是平行的。

在你的第二个例子中,你通过引入一个键使其并行。除了使用 "mapPartition()" 的复杂解决方法,您还可以简单地编写 (Java 8 Style)

DataSet<Tuple1<Long>> input = ...;
input.groupBy(0).reduce( (a, b) -> new Tuple1<>(a.f0 + b.f0);

但是请注意,您的输入数据非常小,无论如何只有一个并行任务。如果你使用更大的输入,你可以看到并行pre-aggregation,例如:

ExecutionEnvironment env =     ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataSet<Long> input = env.generateSequence(1, 100000000);
DataSet<Long> sum = input.reduce ( (a, b) -> a + b );

(2) 为什么reduce() 操作的结果仍然是DataSet?

DataSet 仍然是集群中 X 的惰性表示。您可以继续在并行程序中使用该数据,而无需触发某些计算并将结果数据从分布式工作程序取回驱动程序。这允许您编写更大的程序 运行 完全在分布式工作人员上并延迟执行。没有数据被提取到客户端和 re-distributed 到并行工作者。

特别是在迭代程序中,这非常强大,因为整个循环工作时无需涉及客户端,也不需要 re-deploy 操作员。

您始终可以通过调用 "dataSet.collext().get(0);" 来获取 "X" - 这明确表明应该执行和获取某些内容。