减少 returns 并行流的不可预测结果
Reduce returns unpredictable results for parallel stream
我用 java stream reduce 编写了以下代码示例:
Person reducedPerson = Person.getPersons().stream()
.parallel() //will return surprising result
.reduce(new Person(), (intermediateResult, p2) -> {
intermediateResult.setAge(intermediateResult.getAge() + p2.getAge());
return intermediateResult;
},
(ir1, ir2) -> {
ir1.setAge(ir1.getAge() + ir2.getAge());
return ir1;
});
System.out.println(reducedPerson);
型号:
public class Person {
String name;
Integer age;
public Person() {
age = 0;
name = "default";
}
//...
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public static Collection<Person> getPersons() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Vasya", 12));
persons.add(new Person("Petya", 32));
persons.add(new Person("Serj", 10));
persons.add(new Person("Onotole", 18));
return persons;
}
}
每个代码示例执行returns不同的结果:
示例:
Person{name='default', age=256}
或者
Person{name='default', age=248}
我已经在 combiner
中解决了这个问题,因为在顺序流代码中可以正确执行。
请帮助更正组合器。
P.S.
预期结果:姓名为 'default' 且年龄为 72 岁的人(列表中所有人员的总和)
P.S.
Integer 的相同代码作为 reduce 结果正常工作:
Integer age = Person.getPersons().stream()
.parallel()
.reduce(0, (intermediateResult, p2) -> {
intermediateResult = intermediateResult + p2.getAge();
return intermediateResult;
}, (ir1, ir2) -> {
System.out.println("combiner");
ir1 = ir1 + ir2;
return ir1;
});
System.out.println(age);
正如鲍里斯所说,问题是流中的突变。
Most stream operations accept parameters that describe user-specified
behavior, such as the lambda expression w -> w.getWeight() passed to
mapToInt in the example above. To preserve correct behavior, these
behavioral parameters:
- must be non-interfering (they do not modify the stream source); and in
- most cases must be stateless (their result should not depend on any
state that might change during execution of the stream pipeline).
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
这是使用 reduce 的版本,以及使用 maptoint 和 sum 的更直接的版本。
class gWhosebug{
public static void main(String... args) {
Person reducedPerson = Person.getPersons().stream()
.parallel() //will NOT return surprising result
.reduce(new Person("default",0),
(ir1, ir2) -> //no longer mutates
new Person(String.join(",", ir1.getName(), ir2.getName()), ir1.getAge() + ir2.getAge())
);
System.out.println(reducedPerson);
//here is a clean(er) way to do it:
int totalAge = Person.getPersons().stream()
.parallel() //will NOT return surprising result
.mapToInt(Person::getAge)
.sum();
System.out.println(totalAge);
}
}
class Person {//no longer mutable
public String getName() {
return name;
}
public Integer getAge() {
return age;
}
final String name;
final Integer age;
//no args constructor removed
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public static Collection<Person> getPersons() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Vasya", 12));
persons.add(new Person("Petya", 32));
persons.add(new Person("Serj", 10));
persons.add(new Person("Onotole", 18));
return persons;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Person{");
sb.append("name='").append(name).append('\'');
sb.append(", age=").append(age);
sb.append('}');
return sb.toString();
}
}
要执行可变缩减,请使用 collect
:
reducedPerson = Person.getPersons().parallelStream()
.collect(
Person::new,
(p, q) -> p.setAge(p.getAge() + q.getAge()),
(p, q) -> p.setAge(p.getAge() + q.getAge())
);
collect
专门设计用于安全地累积到可变容器中,即使是并行的。
我用 java stream reduce 编写了以下代码示例:
Person reducedPerson = Person.getPersons().stream()
.parallel() //will return surprising result
.reduce(new Person(), (intermediateResult, p2) -> {
intermediateResult.setAge(intermediateResult.getAge() + p2.getAge());
return intermediateResult;
},
(ir1, ir2) -> {
ir1.setAge(ir1.getAge() + ir2.getAge());
return ir1;
});
System.out.println(reducedPerson);
型号:
public class Person {
String name;
Integer age;
public Person() {
age = 0;
name = "default";
}
//...
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public static Collection<Person> getPersons() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Vasya", 12));
persons.add(new Person("Petya", 32));
persons.add(new Person("Serj", 10));
persons.add(new Person("Onotole", 18));
return persons;
}
}
每个代码示例执行returns不同的结果:
示例:
Person{name='default', age=256}
或者
Person{name='default', age=248}
我已经在 combiner
中解决了这个问题,因为在顺序流代码中可以正确执行。
请帮助更正组合器。
P.S.
预期结果:姓名为 'default' 且年龄为 72 岁的人(列表中所有人员的总和)
P.S.
Integer 的相同代码作为 reduce 结果正常工作:
Integer age = Person.getPersons().stream()
.parallel()
.reduce(0, (intermediateResult, p2) -> {
intermediateResult = intermediateResult + p2.getAge();
return intermediateResult;
}, (ir1, ir2) -> {
System.out.println("combiner");
ir1 = ir1 + ir2;
return ir1;
});
System.out.println(age);
正如鲍里斯所说,问题是流中的突变。
Most stream operations accept parameters that describe user-specified behavior, such as the lambda expression w -> w.getWeight() passed to mapToInt in the example above. To preserve correct behavior, these behavioral parameters:
- must be non-interfering (they do not modify the stream source); and in
- most cases must be stateless (their result should not depend on any state that might change during execution of the stream pipeline).
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
这是使用 reduce 的版本,以及使用 maptoint 和 sum 的更直接的版本。
class gWhosebug{
public static void main(String... args) {
Person reducedPerson = Person.getPersons().stream()
.parallel() //will NOT return surprising result
.reduce(new Person("default",0),
(ir1, ir2) -> //no longer mutates
new Person(String.join(",", ir1.getName(), ir2.getName()), ir1.getAge() + ir2.getAge())
);
System.out.println(reducedPerson);
//here is a clean(er) way to do it:
int totalAge = Person.getPersons().stream()
.parallel() //will NOT return surprising result
.mapToInt(Person::getAge)
.sum();
System.out.println(totalAge);
}
}
class Person {//no longer mutable
public String getName() {
return name;
}
public Integer getAge() {
return age;
}
final String name;
final Integer age;
//no args constructor removed
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public static Collection<Person> getPersons() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Vasya", 12));
persons.add(new Person("Petya", 32));
persons.add(new Person("Serj", 10));
persons.add(new Person("Onotole", 18));
return persons;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Person{");
sb.append("name='").append(name).append('\'');
sb.append(", age=").append(age);
sb.append('}');
return sb.toString();
}
}
要执行可变缩减,请使用 collect
:
reducedPerson = Person.getPersons().parallelStream()
.collect(
Person::new,
(p, q) -> p.setAge(p.getAge() + q.getAge()),
(p, q) -> p.setAge(p.getAge() + q.getAge())
);
collect
专门设计用于安全地累积到可变容器中,即使是并行的。