为什么原始 Stream 没有 collect(Collector)?

Why don't primitive Stream have collect(Collector)?

我正在为新手程序员编写一个库,所以我试图保持 API 尽可能干净。

我的图书馆需要做的一件事是对大量整数或长整数执行一些复杂的计算。我的用户需要从很多场景和业务对象中计算这些值,所以我认为最好的方法是使用流来允许用户将业务对象映射到 IntStreamLongStream,然后计算收集器内部的计算。

然而 IntStream 和 LongStream 只有 3 个参数的收集方法:

collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R,R> combiner)

并且没有 Stream<T> 所具有的更简单的 collect(Collector) 方法。

所以而不是能够做到

Collection<T> businessObjs = ...
MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( new MyComplexComputation(...));

我必须像这样提供供应商、累加器和组合器:

MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( 
                                  ()-> new MyComplexComputationBuilder(...),
                                  (builder, v)-> builder.add(v),
                                  (a,b)-> a.merge(b))
                              .build(); //prev collect returns Builder object

这对我的新手用户来说太复杂了,而且很容易出错。

我的解决方法是制作静态方法,将 IntStreamLongStream 作为输入并为您隐藏收集器的创建和执行

public static MyResult compute(IntStream stream, ...){
       return .collect( 
                        ()-> new MyComplexComputationBuilder(...),
                        (builder, v)-> builder.add(v),
                        (a,b)-> a.merge(b))
               .build();
}

但这不符合使用 Streams 的常规惯例:

IntStream tmpStream = businessObjs.stream()
                              .mapToInt( ... );

 MyResult result = MyUtil.compute(tmpStream, ...);

因为您必须保存一个临时变量并将其传递给静态方法,或者在静态调用中创建 Stream,当它与我的计算的其他参数混合在一起时可能会造成混淆。

在仍然使用 IntStreamLongStream 的同时,是否有更简洁的方法来执行此操作?

如果缺少某些方法,请将原始流转换为盒装对象流。

MyResult result = businessObjs.stream()
                          .mapToInt( ... )
                          .boxed()
                          .collect( new MyComplexComputation(...));

或者一开始就不要使用原始流,而是一直使用 Integers。

MyResult result = businessObjs.stream()
                          .map( ... )     // map to Integer not int
                          .collect( new MyComplexComputation(...));

我们实际上做了一些 Collector.OfXxx 专业化的原型。我们发现——除了更专业类型的明显烦恼之外——如果没有完整的原始专业集合(就像 Trove 或 GS​​-Collections,但 JDK没有)。例如,如果没有 IntArrayList,Collector.OfInt 只会将装箱推到其他地方——从收集器到容器——这没什么大不了的,还有更多的 API 表面。

先生Geotz ,但是,我想进一步调查这个决定对性能的影响有多大。

我想我会 post 我的结果作为答案。

我使用 jmh microbenchmark framework 来计算使用两种收集器计算大小为 1、100、1000、100,000 和 100 万的集合所需的计算时间:

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
public class MyBenchmark {

@Param({"1", "100", "1000", "100000", "1000000"})
public int size;

List<BusinessObj> seqs;

@Setup
public void setup(){
    seqs = new ArrayList<BusinessObj>(size);
    Random rand = new Random();
    for(int i=0; i< size; i++){
        //these lengths are random but over 128 so no caching of Longs
        seqs.add(BusinessObjFactory.createOfRandomLength());
    }
}
@Benchmark
public double objectCollector() {       

    return seqs.stream()
                .map(BusinessObj::getLength)
                .collect(MyUtil.myCalcLongCollector())
                .getAsDouble();
}

@Benchmark
public double primitiveCollector() {

    LongStream stream= seqs.stream()
                                    .mapToLong(BusinessObj::getLength);
    return MyUtil.myCalc(stream)        
                        .getAsDouble();
}

public static void main(String[] args) throws RunnerException{
    Options opt = new OptionsBuilder()
                        .include(MyBenchmark.class.getSimpleName())
                        .build();

    new Runner(opt).run();
}

}

结果如下:

# JMH 1.9.3 (released 4 days ago)
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 20 iterations, 1 s each
# Measurement: 20 iterations, 1 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.sample.MyBenchmark.objectCollector

# Run complete. Total time: 01:30:31

Benchmark                        (size)  Mode  Cnt          Score         Error  Units
MyBenchmark.objectCollector           1  avgt  200        140.803 ±       1.425  ns/op
MyBenchmark.objectCollector         100  avgt  200       5775.294 ±      67.871  ns/op
MyBenchmark.objectCollector        1000  avgt  200      70440.488 ±    1023.177  ns/op
MyBenchmark.objectCollector      100000  avgt  200   10292595.233 ±  101036.563  ns/op
MyBenchmark.objectCollector     1000000  avgt  200  100147057.376 ±  979662.707  ns/op
MyBenchmark.primitiveCollector        1  avgt  200        140.971 ±       1.382  ns/op
MyBenchmark.primitiveCollector      100  avgt  200       4654.527 ±      87.101  ns/op
MyBenchmark.primitiveCollector     1000  avgt  200      60929.398 ±    1127.517  ns/op
MyBenchmark.primitiveCollector   100000  avgt  200    9784655.013 ±  113339.448  ns/op
MyBenchmark.primitiveCollector  1000000  avgt  200   94822089.334 ± 1031475.051  ns/op

如您所见,原始 Stream 版本稍微快一些,但即使集合中有 100 万个元素,它也只快 0.05 秒(平均)。

对于我的 API 我宁愿保持更清晰的对象流约定并使用盒装版本,因为它的性能损失很小。

感谢所有深入了解此问题的人。

我在我的库 StreamEx (since version 0.3.0). There are interfaces IntCollector, LongCollector and DoubleCollector 中实现了原语收集器,它扩展了 Collector 接口并专门用于处理原语。组合过程还有一个细微差别,因为像 IntStream.collect 这样的方法接受 BiConsumer 而不是 BinaryOperator.

有一堆预定义的集合方法将数字连接到字符串,存储到原始数组,到BitSet,查找最小值、最大值、总和,计算汇总统计信息,执行分组依据和分区依据操作。当然,您可以定义自己的收集器。这里有几个使用示例(假设您有 int[] input 包含输入数据的数组)。

将数字作为带分隔符的字符串加入:

String nums = IntStreamEx.of(input).collect(IntCollector.joining(","));

按最后一位分组:

Map<Integer, int[]> groups = IntStreamEx.of(input)
      .collect(IntCollector.groupingBy(i -> i % 10));

正负数分别求和:

Map<Boolean, Integer> sums = IntStreamEx.of(input)
      .collect(IntCollector.partitioningBy(i -> i > 0, IntCollector.summing()));

这是一个简单的benchmark,比较了这些收集器和普通的对象收集器。

请注意,我的库不提供(将来也不会提供)任何用户可见的数据结构,例如基元上的映射,因此分组是按照通常的方式执行的 HashMap。但是,如果您使用 Trove/GS/HFTC/whatever,为这些库中定义的数据结构编写额外的原始收集器以获得更高性能并不难。

也许如果使用方法引用而不是 lambda,原始流收集所需的代码将不会看起来那么复杂。

MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( 
                                  MyComplexComputationBuilder::new,
                                  MyComplexComputationBuilder::add,
                                  MyComplexComputationBuilder::merge)
                              .build(); //prev collect returns Builder object

在 Brian 的 中,他提到了另外两个 Java 集合框架,它们确实具有原始集合,实际上可以与原始流上的 collect 方法一起使用。我认为用原始流来说明如何在这些框架中使用原始容器的一些示例可能会很有用。下面的代码也适用于并行流。

// Eclipse Collections
List<Integer> integers = Interval.oneTo(5).toList();

Assert.assertEquals(
        IntInterval.oneTo(5),
        integers.stream()
                .mapToInt(Integer::intValue)
                .collect(IntArrayList::new, IntArrayList::add, IntArrayList::addAll));

// Trove Collections

Assert.assertEquals(
        new TIntArrayList(IntStream.range(1, 6).toArray()),
        integers.stream()
                .mapToInt(Integer::intValue)
                .collect(TIntArrayList::new, TIntArrayList::add, TIntArrayList::addAll));

注意:我是 Eclipse Collections 的提交者。