在一次通过中执行多次减少

Performing more than one reduction in a single pass

在流的单次传递中执行多次归约的成语是什么?如果需要不止一种类型的归约计算,即使这违反了 SRP,也只是拥有一个大的归约者 class 吗?

您可能希望避免多次通过,因为流水线阶段可能很昂贵。或者您希望避免收集中间值以便通过多个收集器 运行 它们,因为存储所有值的成本可能太高。

一样,Collectors.summarizingInt 将收集 int 个值并对它们执行多次归约,返回一个名为 IntSummaryStatistics 的聚合结构。有类似的收集器用于汇总 doublelong 值。

不幸的是,这些只执行一组固定的归约,所以如果你想做不同于他们所做的归约,你必须编写自己的收集器。

这是一种在单次传递中使用多个不相关的收集器的技术。我们可以使用 peek() 对通过流的每个值进行破解,使其不受干扰地通过。 peek() 操作采用 Consumer,因此我们需要一种方法将 Collector 适配为 ConsumerConsumer 将是收集器的 accumulator 函数。但我们还需要调用 Collector 的 supplier 函数并存储它创建的对象以传递给 accumulator 函数。我们需要一种方法从收集器中获取结果。为此,我们将 Collector 包装在一个小助手中 class:

public class PeekingCollector<T,A,R> {
    final Collector<T,A,R> collector;
    final A acc;

    public PeekingCollector(Collector<T,A,R> collector) {
        this.collector = collector;
        this.acc = collector.supplier().get();
    }

    public Consumer<T> peek() {
        if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            return t -> collector.accumulator().accept(acc, t);
        else
            return t -> {
                synchronized (this) {
                    collector.accumulator().accept(acc, t);
                }
            };
    }

    public synchronized R get() {
        return collector.finisher().apply(acc);
    }
}

要使用它,我们首先必须创建包装收集器并挂在它上面。然后我们 运行 管道并调用 peek,传递包装的收集器。最后我们在包装收集器上调用 get 来获得它的结果。这是一个简单的示例,它过滤和排序一些单词,同时还按首字母对它们进行分组:

    List<String> input = Arrays.asList(
        "aardvark", "crocodile", "antelope",
        "buffalo", "bustard", "cockatoo",
        "capybara", "bison", "alligator");

    PeekingCollector<String,?,Map<String,List<String>>> grouper =
        new PeekingCollector<>(groupingBy(s -> s.substring(0, 1)));

    List<String> output = input.stream()
                               .filter(s -> s.length() > 5)
                               .peek(grouper.peek())
                               .sorted()
                               .collect(toList());

    Map<String,List<String>> groups = grouper.get();
    System.out.println(output);
    System.out.println(groups);

输出为:

[aardvark, alligator, antelope, buffalo, bustard, capybara, cockatoo, crocodile]
{a=[aardvark, antelope, alligator], b=[buffalo, bustard], c=[crocodile, cockatoo, capybara]}

这有点麻烦,因为您必须写出包装收集器的泛型类型(这有点不寻常;它们通常都是推断出来的)。但是,如果处理或存储流值的开销足够大,也许值得这么麻烦。

最后请注意,如果流是 运行 并行,则可以从多个线程调用 peek()。因此,非线程安全的收集器必须由 synchronized 块保护。如果收集器是线程安全的,我们就不需要在调用它时进行同步。为了确定这一点,我们检查收集器的 CONCURRENT 特征。如果您 运行 一个并行流,最好在 peek 操作中放置一个并发收集器(例如 groupingByConcurrenttoConcurrentMap),否则包装收集器中的同步可能造成瓶颈并减慢整个流。