从迭代器创建的 CompletableFuture 流不会延迟计算

CompletableFuture stream created from iterator is not lazily evaluated

我对可完成期货的完成方式和时间感到有些困惑。我创建了这个测试用例:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

输出为:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

testList 方法按预期工作。 CompletableFuture 只在最后计算,所以限制方法只保留前两项。

然而,testIterator方法却出乎意料。所有 CompletableFuture 都完成了,之后才进行限制。

如果我从流中删除 parallel() 方法,它将按预期工作。但是,处理(forEach())应该并行完成,因为在我的完整程序中它是一个长运行方法。

任何人都可以解释为什么会这样吗?

看起来这取决于 Java 版本,所以我在 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

并行适用于整个管道,因此您无法真正控制在并行应用 limit() 之前执行的内容 Stream。唯一的保证是 limit() 之后的内容只会在保留的元素上执行。

两者之间的差异可能是由于一些实现细节或其他 Stream 特征。事实上,您可以通过使用 SIZED 特征轻松反转行为。似乎当 Stream 的大小已知时,只处理了 2 个元素。

因此,例如,应用一个简单的 filter() 将丢失列表版本的大小:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);

例如输出:

Running list one
Running list five
Running list two
Running list three
list one
list two

并且不使用 Spliterator.spliterator() "fixes" 的 未知大小 版本的行为:

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

输出:

Running iterator two
Running iterator one
iterator one
iterator two

你说的“所有CompletableFuture已完成”等同于“所有CompletableFuture已创建”,一旦supplyAsync执行完毕,对供应商的评价就完成了已安排,无论是否有人最终会调用 get

所以你在这里感知到的是传递给map的函数的求值,即使后续处理不会消耗结果。这是一个有效的行为;该函数可能会以任意顺序甚至并发地为超出必要的更多元素进行评估,只要 Stream 之后将使用正确的结果,就限制和遇到顺序而言。

现在,是否会评估比必要更多的元素以及处理多少多余的元素,是一个实现细节,并且实现已更改,如“”中所述。虽然该问答是关于无序流的,但有可能对有序流进行了类似的改进。

要点是,您不应假设仅针对最少数量的必需元素评估函数。这样做会降低并行处理的效率。这仍然适用,即使 Java 9 改进了并行 limit 操作。一个简单的更改可能会重新引入更多元素的评估:

private void completeFirstTwoElements(Stream<String> stream) {
    stream.map(this::cf)
          .filter(x -> true)
          .limit(2)
          .parallel()
          .forEach(cf -> System.out.println(cf.join()));
}