减少 returns 并行流的不可预测结果

Reduce returns unpredictable results for parallel stream

我用 java stream reduce 编写了以下代码示例:

Person reducedPerson = Person.getPersons().stream()
                .parallel()  //will return surprising result
                .reduce(new Person(), (intermediateResult, p2) -> {
                            intermediateResult.setAge(intermediateResult.getAge() + p2.getAge());
                            return intermediateResult;
                        },
                        (ir1, ir2) -> {
                            ir1.setAge(ir1.getAge() + ir2.getAge());
                            return ir1;
                        });
        System.out.println(reducedPerson);

型号:

public class Person {

    String name;

    Integer age;

    public Person() {
        age = 0;
        name = "default";
    }

    //...
    public Person(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public static Collection<Person> getPersons() {
        List<Person> persons = new ArrayList<>();
        persons.add(new Person("Vasya", 12));
        persons.add(new Person("Petya", 32));
        persons.add(new Person("Serj", 10));
        persons.add(new Person("Onotole", 18));
        return persons;
    }
}

每个代码示例执行returns不同的结果:

示例: Person{name='default', age=256} 或者

Person{name='default', age=248}

我已经在 combiner 中解决了这个问题,因为在顺序流代码中可以正确执行。

请帮助更正组合器。

P.S.

预期结果:姓名为 'default' 且年龄为 72 岁的人(列表中所有人员的总和)

P.S.

Integer 的相同代码作为 reduce 结果正常工作:

Integer age = Person.getPersons().stream()
                .parallel()
                .reduce(0, (intermediateResult, p2) -> {
                    intermediateResult = intermediateResult + p2.getAge();
                    return intermediateResult;
                }, (ir1, ir2) -> {
                    System.out.println("combiner");
                    ir1 = ir1 + ir2;
                    return ir1;
                });
        System.out.println(age);

正如鲍里斯所说,问题是流中的突变。

Most stream operations accept parameters that describe user-specified behavior, such as the lambda expression w -> w.getWeight() passed to mapToInt in the example above. To preserve correct behavior, these behavioral parameters:

  • must be non-interfering (they do not modify the stream source); and in
  • most cases must be stateless (their result should not depend on any state that might change during execution of the stream pipeline).

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

这是使用 reduce 的版本,以及使用 maptoint 和 sum 的更直接的版本。

class gWhosebug{
  public static void main(String... args) {
    Person reducedPerson = Person.getPersons().stream()
        .parallel()  //will NOT return surprising result
        .reduce(new Person("default",0),
            (ir1, ir2) -> //no longer mutates
                new Person(String.join(",", ir1.getName(), ir2.getName()), ir1.getAge() + ir2.getAge())
        );
    System.out.println(reducedPerson);

    //here is a clean(er) way to do it:
    int totalAge = Person.getPersons().stream()
        .parallel()  //will NOT return surprising result
        .mapToInt(Person::getAge)
        .sum();
    System.out.println(totalAge);
  }
}

class Person {//no longer mutable

  public String getName() {
    return name;
  }

  public Integer getAge() {
    return age;
  }

  final String name;

  final Integer age;

  //no args constructor removed
  public Person(String name, Integer age) {
    this.name = name;
    this.age = age;
  }

  public static Collection<Person> getPersons() {
    List<Person> persons = new ArrayList<>();
    persons.add(new Person("Vasya", 12));
    persons.add(new Person("Petya", 32));
    persons.add(new Person("Serj", 10));
    persons.add(new Person("Onotole", 18));
    return persons;
  }
  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder("Person{");
    sb.append("name='").append(name).append('\'');
    sb.append(", age=").append(age);
    sb.append('}');
    return sb.toString();
  }
}

要执行可变缩减,请使用 collect:

reducedPerson = Person.getPersons().parallelStream()
        .collect(
                Person::new,
                (p, q) -> p.setAge(p.getAge() + q.getAge()),
                (p, q) -> p.setAge(p.getAge() + q.getAge())
        );

collect 专门设计用于安全地累积到可变容器中,即使是并行的。