Reduce for parallel stream without combiner 由多个线程正确执行。在这种情况下我什么时候应该使用组合器?

Reduce for parallel stream without combiner executes by several threads correctly. When should I use combiner at this case?

我已阅读以下内容:

并且我做出了合并器仅在并行流中使用的决议,以获得正确的合并累加器结果。每个线程一个累加器实例。

因此我做了 reduce 没有 combiner 的决议将无法正常工作。

为了检查这一点,我编写了以下示例:

   Person reduce = Person.getPersons().stream()
                .parallel() 
                .reduce(new Person(), (intermediateResult, p2) -> {
                    System.out.println(Thread.currentThread().getName());
                    return new Person("default", intermediateResult.getAge() + p2.getAge());
                });
        System.out.println(reduce);

型号:

public class Person {

    String name;

    Integer age;
    ///...

    public static Collection<Person> getPersons() {
        List<Person> persons = new ArrayList<>();
        persons.add(new Person("Vasya", 12));
        persons.add(new Person("Petya", 32));
        persons.add(new Person("Serj", 10));
        persons.add(new Person("Onotole", 18));
        return persons;
   }
}

如你所见,我不提供组合器
示例输出:

ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
Person{name='default', age=72}

我已经多次执行应用程序并且总是看到正确的结果。

如果没有提供组合器,请解释如何减少并行流的工作。

在这种情况下,您的累加器也可以用作组合器。当缩减类型与流元素类型相同时,这是 shorthand。于是

myStream.reduce(identity, accumulator);

完全等同于

myStream.reduce(identity, accumulator, accumulator);

您甚至可以在 OpenJDK:

查看这些方法的源代码
@Override
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, 
                          BinaryOperator<R> combiner) {
    return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}

@Override
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}

三参数版本更灵活,因为缩减操作可能会产生另一种类型的对象。在这种情况下,您不能使用双参数缩减,因为您没有提供如何组合结果类型的两个元素的规则。然而,当结果类型相同时,累加器和组合器在相同的对象类型上工作,因此如果它是关联的,它应该只是相同的操作。

已经指定了一个组合器。在这种情况下,组合器函数与您的 accumulator 函数相同。

如果结果类型与您的流元素类型相同,这总是可能的。

与通过对值求和进行归约相比,a+b+c+d 可能会通过计算 (a+b)+(c+d) 来并行评估。这里的累加器就是加法,和combiner函数处理(a+b)(c+d).

的中间结果是一样的操作

此示例还表明,除非涉及类型转换,否则如果您需要不同的组合器函数会很奇怪,因为累加器函数的关联性约束意味着它通常作为组合器函数就足够了。请记住,流是否计算 a+b+c+d(a+b+c)+d(a+b)+(c+d)a+(b+c+d).

应该是无关紧要的

3 参数 reduce 存在于一种不常见的情况,如下所示:

想象一下,您没有将 Person 的流减少为 Person,而是有一个不同的中间值,例如 PopulationStats

class PopulationStats {
  // make new stats that includes this person
  PopulationStats addPerson(Person p) {
    return new PopulationStats(........);
  }

  // make new stats that combines this and other stats
  PopulationStats addStats(PopulationStats other) {
    return new PopulationStats(........);
  } 
}

在这种情况下,3 参数 reduce 用于避免在减少之前为每个 Person 制作 PopulationStats 的中间步骤。

PopulationStats stats = people.stream()
  .reduce(new PopulationStats(), PopulationStats::addPerson, PopulationStats::addStats);