为什么 Java stream map reduce 计算我的结果两次?
Why does Java stream map reduce count my result twice?
我有这个代码:
ComparisonResults comparisonResults = requestsList
.stream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
和此代码 b:
ComparisonResults comparisonResults = requestsList
.parallelStream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
我所做的就是创建响应对象,然后将它们转换为 comaprisonResult 对象并将它们缩减为一个 comaprisonResult。
代码 a
显示一个 int class 成员 comparisonResults.num_of_sub_responses==5
是正确的
代码 b
显示一个 int class 成员 comparisonResults.num_of_sub_responses==10
是正确结果的两倍。
java 8 reduce应该是线程安全的吧?
我错过了什么吗?
getResponse
和 compareToBl
是线程安全的
您正在改变 reduce
中的传入对象。这是错误的。在 修改传入对象后 创建新对象并没有帮助。
你想做的,是
.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
(a,b)->/* code to merge two ComparisonResults instances*/);
如果.map(item -> compareToBl(item))
的结果是ComparisonResults
,或者换句话说,addSingleResult
合并了两个ComparisonResults
实例,你可以使用ComparisonResults::addSingleResult
作为合并功能,虽然它的名字有点误导。
您应该仔细阅读 “Reduction” chapter of the documentation 及其后续文章“可变减少”。
reduce 操作 reduce(identity, operator)
依赖于关于您传递给它的参数的两个重要假设。
- 第一个参数是身份。也就是说,当您在任何项目和给定的
identity
之间使用归约运算符时,您将获得原始项目。
- 运算符是关联的。
这两个假设在使用并行流时非常重要,因为一般来说,它所做的是:
- 给每个线程一个流片段
每个线程以identity开始,使用stream中的元素累加一个结果:
result = identity op element1 op element2 op element3 ...
然后使用运算符将来自不同线程的结果组合在一起:
grand result = result1 op result2 op result3
因此,假设您要对数字求和,它会将 5 + 4 + 3 + 20
之类的运算分解为 ( 0 + 5 + 4 ) + ( 0 + 3 + 20 )
。当且仅当上述假设成立时,这才有效。 (0是加法的恒等式,加法是结合的)
但是通过改变运算符中的第一个操作数,这意味着您实际上是在改变 identity 对象。所以它不能再被视为 identity。那就是 op(identity,result)
不会给你与 result
本身相同的价值。
如果运算符不是关联的,问题会出现在"grand result"阶段。
我有这个代码:
ComparisonResults comparisonResults = requestsList
.stream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
和此代码 b:
ComparisonResults comparisonResults = requestsList
.parallelStream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
我所做的就是创建响应对象,然后将它们转换为 comaprisonResult 对象并将它们缩减为一个 comaprisonResult。
代码 a
显示一个 int class 成员 comparisonResults.num_of_sub_responses==5
是正确的
代码 b
显示一个 int class 成员 comparisonResults.num_of_sub_responses==10
是正确结果的两倍。
java 8 reduce应该是线程安全的吧?
我错过了什么吗?
getResponse
和 compareToBl
是线程安全的
您正在改变 reduce
中的传入对象。这是错误的。在 修改传入对象后 创建新对象并没有帮助。
你想做的,是
.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
(a,b)->/* code to merge two ComparisonResults instances*/);
如果.map(item -> compareToBl(item))
的结果是ComparisonResults
,或者换句话说,addSingleResult
合并了两个ComparisonResults
实例,你可以使用ComparisonResults::addSingleResult
作为合并功能,虽然它的名字有点误导。
您应该仔细阅读 “Reduction” chapter of the documentation 及其后续文章“可变减少”。
reduce 操作 reduce(identity, operator)
依赖于关于您传递给它的参数的两个重要假设。
- 第一个参数是身份。也就是说,当您在任何项目和给定的
identity
之间使用归约运算符时,您将获得原始项目。 - 运算符是关联的。
这两个假设在使用并行流时非常重要,因为一般来说,它所做的是:
- 给每个线程一个流片段
每个线程以identity开始,使用stream中的元素累加一个结果:
result = identity op element1 op element2 op element3 ...
然后使用运算符将来自不同线程的结果组合在一起:
grand result = result1 op result2 op result3
因此,假设您要对数字求和,它会将 5 + 4 + 3 + 20
之类的运算分解为 ( 0 + 5 + 4 ) + ( 0 + 3 + 20 )
。当且仅当上述假设成立时,这才有效。 (0是加法的恒等式,加法是结合的)
但是通过改变运算符中的第一个操作数,这意味着您实际上是在改变 identity 对象。所以它不能再被视为 identity。那就是 op(identity,result)
不会给你与 result
本身相同的价值。
如果运算符不是关联的,问题会出现在"grand result"阶段。