收集器的组合器功能是否可以用于顺序流?

Can a Collector's combiner function ever be used on sequential streams?

示例程序:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

所以,为了简化这里的事情,没有最终的转换,所以生成的代码非常简单。

现在,IntStream.range() 生成顺序流。我只是将结果装入 Integer 中,然后我设计的 Collector 将它们收集到 List<Integer> 中。很简单。

而且无论我 运行 这个示例程序多少次,UnsupportedOperationException 永远不会命中,这意味着我的虚拟组合器永远不会被调用。

我有点预料到这一点,但后来我对流的误解已经够多了,我不得不问这个问题...

当流 保证 是连续的时,是否可以调用 Collector 的组合器?

正如@MarkoTopolnik 和@Duncan 在之前的评论中所观察到的那样,不能保证在顺序模式下调用 Collector.combiner() 会产生减少的结果。事实上,Java 文档在这一点上有点主观,可能导致不恰当的解释。

(...) A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result.

根据 NoBlogDefFound 组合器仅在并行模式下使用。部分引述如下:

combiner() is used to join two accumulators together into one. It is used when collector is executed in parallel, splitting input Stream and collecting parts independently first.

为了更清楚地说明这个问题,我重写了第一段代码并采用了两种方法(串行和并行)。


public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {

        final Collector<Integer, ?, List<Integer>> c =
                Collector
                    .of(ArrayList::new, List::add, nope());

        // approach sequential
        Stream<Integer> sequential = IntStream
                .range(0, 10_000_000)
                .boxed();

        System.out.println("isParallel:" + sequential.isParallel());
        sequential
                .collect(c);

        // approach parallel
        Stream<Integer> parallel = IntStream
                .range(0, 10_000_000)
                .parallel()
                .boxed();

        System.out.println("isParallel:" + parallel.isParallel());
        parallel
                .collect(c);
    }
}

在运行这段代码之后我们可以得到输出:

isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
    at com.Whosebug.lambda.CollectorTest.lambda$nope[=11=](CollectorTest.java:18)
    at com.Whosebug.lambda.CollectorTest$$Lambda/2001049719.apply(Unknown Source)
    at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:174)
    at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:160)

所以,根据这个结果我们可以推断Collector's combiner只能被并行执行调用。

仔细阅读 ReduceOps.java 中的流实现代码会发现,组合函数仅在 ReduceTask 完成时调用,并且 ReduceTask 实例仅在评估管道时使用在平行下。因此,在当前的实现中, 在评估顺序管道时永远不会调用组合器。

但是,规范中没有任何内容可以保证这一点。 Collector 是一个对其实现有要求的接口,并且没有为顺序流授予豁免。就个人而言,我很难想象为什么顺序流水线评估可能需要调用组合器,但比我更有想象力的人可能会找到它的巧妙用途,并实现它。规范允许,即使今天的实现没有做到,你还是要考虑一下。

这应该不足为奇。流API的设计中心是支持与顺序执行平等的并行执行。当然,程序可以观察到它是顺序执行还是并行执行。但是 API 的设计是为了支持一种允许任一种编程风格。

如果您正在编写收集器并且发现不可能(或不方便或困难)编写关联组合器函数,导致您想要将流限制为顺序执行,也许这意味着您朝着错误的方向前进。是时候退后一步,考虑以不同的方式解决问题了。

不需要关联组合器函数的常见 reduction-style 操作称为 fold-left。主要特点是严格应用折叠功能 left-to-right,一次进行一个。我不知道并行化 fold-left.

的方法

当人们试图以我们一直在谈论的方式扭曲收藏家时,他们通常会寻找类似 fold-left 的东西。 Streams API 没有对此操作的直接 API 支持,但它很容易编写。例如,假设您想使用以下操作减少字符串列表:重复第一个字符串,然后追加第二个。很容易证明这个操作不是关联的:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

运行 依次产生所需的输出:

aabaabcaabaabcdaabaabcaabaabcde

但是当运行并行时,它可能会产生这样的结果:

aabaabccdde

因为它是 "works" 顺序的,我们可以通过调用 sequential() 来强制执行它,并通过让组合器抛出异常来支持它。此外,供应商必须恰好被调用一次。没有办法合并中间结果,所以如果供应商被调用两次,我们就已经有麻烦了。但是由于我们 "know" 供应商在顺序模式下只被调用一次,所以大多数人并不担心这一点。事实上,我看到有人写 "suppliers" return 一些现有的 object 而不是创建一个新的,这违反了供应商合同。

collect() 的 3-arg 形式的使用中,三个函数中有两个违反了它们的约定。这不应该告诉我们以不同的方式做事吗?

这里的主要工作是由累加器函数完成的。要完成 fold-style 减少,我们可以使用 forEachOrdered() 以严格的 left-to-right 顺序应用此函数。我们必须在前后做一些设置和完成代码,但这没问题:

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

自然地,这在并行时工作得很好,尽管 运行 并行的性能优势可能会被 forEachOrdered().

的排序要求抵消

总而言之,如果您发现自己想要进行可变缩减,但缺少关联组合器功能,导致您将流限制为顺序执行,请将问题重铸为 fold-left 运算并在你的累加器函数上使用 forEachRemaining()