Stream.reduce() 和 Stream.collect() 之间令人惊讶的性能差异
Surprising performance differences between Stream.reduce() and Stream.collect()
我想比较两个 Java8 流终端操作 reduce()
和 collect()
的并行性能。
让我们看看下面的Java8并行流示例:
import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static java.math.BigInteger.ONE;
public class StartMe {
static Function<Long, BigInteger> fac;
static {
fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
}
static long N = 2000;
static Supplier<BigInteger[]> one() {
BigInteger[] result = new BigInteger[1];
result[0] = ONE;
return () -> result;
}
static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
return (BigInteger[] ba, BigInteger b) -> {
synchronized (fac) {
ba[0] = ba[0].multiply(b);
}
};
}
static BiConsumer<BigInteger[], BigInteger[]> combiner() {
return (BigInteger[] b1, BigInteger[] b2) -> {};
}
public static void main(String[] args) throws Exception {
long t0 = System.currentTimeMillis();
BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
long t1 = System.currentTimeMillis();
BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
long t2 = System.currentTimeMillis();
BigInteger result3 = fac.apply(N);
long t3 = System.currentTimeMillis();
System.out.println("reduce(): deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);
}
}
它计算 n!使用一些 - 诚然很奇怪 ;-) - 算法。
性能结果却令人惊讶:
reduce(): deltaT = 44ms, result 1 = 3316275...
collect(): deltaT = 22ms, result 2 = 3316275...
recursive: deltaT = 11ms, result 3 = 3316275...
一些评论:
- 我必须同步
accumulator()
因为它并行访问同一个数组。
- 我预计
reduce()
和 collect()
会产生相同的性能,但 reduce()
比 collect()
慢约 2 倍,即使 collect()
必须是已同步!
- 最快的算法是顺序递归算法(这可能显示并行流管理的巨大开销)
没想到reduce()
的表现会比collect()
差。为什么会这样?
基本上,您是在测量第一次执行的代码的初始开销。不仅优化器还没有任何工作,您正在测量加载、验证和初始化 类.
的开销
因此,评估时间减少也就不足为奇了,因为每次评估都可以重用 类 已经为之前的评估加载。 运行 循环中的所有三个评估,甚至只是改变顺序都会给你一个完全不同的画面。
唯一可预测的结果是简单的递归计算将具有最小的初始开销,因为它不需要加载 Stream
API 类.
如果您多次 运行 代码,或者更好,使用复杂的基准测试工具,我想您会得到与我类似的结果,其中 reduce
明显优于 collect
并且确实比单线程方法快。
collect
变慢的原因是您完全错误地使用了它。将为每个线程查询 Supplier
以获得不同的容器,因此累加器函数 不需要 需要任何额外的同步。但重要的是组合器函数正确地工作以将不同线程的结果容器连接成一个结果。
正确的方法是:
BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
.collect(()->new BigInteger[]{ONE},
(a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));
在我的系统上,它的性能与 reduce
方法相当。由于使用数组作为可变容器不能改变 BigInteger
的不可变性质,因此在这里使用 collect
没有任何优势,使用 reduce
是直截了当的,并且如前所述,正确使用这两种方法时性能相同。
顺便说一句,我不明白为什么这么多程序员试图创建自引用 lambda 表达式。递归函数的直接方法仍然是方法:
static BigInteger fac(long x) {
return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;
(尽管在您的代码中,您根本不需要 Function<Long, BigInteger>
,直接调用 fac(long)
即可)。
最后一点,Stream.iterate
和 Stream.limit
都不利于并行执行。使用具有可预测大小和独立操作的流将显着优于您的解决方案:
BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
.mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);
我想比较两个 Java8 流终端操作 reduce()
和 collect()
的并行性能。
让我们看看下面的Java8并行流示例:
import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static java.math.BigInteger.ONE;
public class StartMe {
static Function<Long, BigInteger> fac;
static {
fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
}
static long N = 2000;
static Supplier<BigInteger[]> one() {
BigInteger[] result = new BigInteger[1];
result[0] = ONE;
return () -> result;
}
static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
return (BigInteger[] ba, BigInteger b) -> {
synchronized (fac) {
ba[0] = ba[0].multiply(b);
}
};
}
static BiConsumer<BigInteger[], BigInteger[]> combiner() {
return (BigInteger[] b1, BigInteger[] b2) -> {};
}
public static void main(String[] args) throws Exception {
long t0 = System.currentTimeMillis();
BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
long t1 = System.currentTimeMillis();
BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
long t2 = System.currentTimeMillis();
BigInteger result3 = fac.apply(N);
long t3 = System.currentTimeMillis();
System.out.println("reduce(): deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);
}
}
它计算 n!使用一些 - 诚然很奇怪 ;-) - 算法。
性能结果却令人惊讶:
reduce(): deltaT = 44ms, result 1 = 3316275...
collect(): deltaT = 22ms, result 2 = 3316275...
recursive: deltaT = 11ms, result 3 = 3316275...
一些评论:
- 我必须同步
accumulator()
因为它并行访问同一个数组。 - 我预计
reduce()
和collect()
会产生相同的性能,但reduce()
比collect()
慢约 2 倍,即使collect()
必须是已同步! - 最快的算法是顺序递归算法(这可能显示并行流管理的巨大开销)
没想到reduce()
的表现会比collect()
差。为什么会这样?
基本上,您是在测量第一次执行的代码的初始开销。不仅优化器还没有任何工作,您正在测量加载、验证和初始化 类.
的开销因此,评估时间减少也就不足为奇了,因为每次评估都可以重用 类 已经为之前的评估加载。 运行 循环中的所有三个评估,甚至只是改变顺序都会给你一个完全不同的画面。
唯一可预测的结果是简单的递归计算将具有最小的初始开销,因为它不需要加载 Stream
API 类.
如果您多次 运行 代码,或者更好,使用复杂的基准测试工具,我想您会得到与我类似的结果,其中 reduce
明显优于 collect
并且确实比单线程方法快。
collect
变慢的原因是您完全错误地使用了它。将为每个线程查询 Supplier
以获得不同的容器,因此累加器函数 不需要 需要任何额外的同步。但重要的是组合器函数正确地工作以将不同线程的结果容器连接成一个结果。
正确的方法是:
BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
.collect(()->new BigInteger[]{ONE},
(a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));
在我的系统上,它的性能与 reduce
方法相当。由于使用数组作为可变容器不能改变 BigInteger
的不可变性质,因此在这里使用 collect
没有任何优势,使用 reduce
是直截了当的,并且如前所述,正确使用这两种方法时性能相同。
顺便说一句,我不明白为什么这么多程序员试图创建自引用 lambda 表达式。递归函数的直接方法仍然是方法:
static BigInteger fac(long x) {
return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;
(尽管在您的代码中,您根本不需要 Function<Long, BigInteger>
,直接调用 fac(long)
即可)。
最后一点,Stream.iterate
和 Stream.limit
都不利于并行执行。使用具有可预测大小和独立操作的流将显着优于您的解决方案:
BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
.mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);