Java8 流顺序执行和并行执行产生不同的结果?

Java8 streams sequential and parallel execution produce different results?

运行 Java8 中的以下流示例:

    System.out.println(Stream
        .of("a", "b", "c", "d", "e", "f")
        .reduce("", (s1, s2) -> s1 + "/" + s2)
    );

产量:

/a/b/c/d/e/f

当然,这不足为奇。 由于 http://docs.oracle.com/javase/8/docs/api/index.html?overview-summary.html 流是顺序执行还是并行执行并不重要:

Except for operations identified as explicitly nondeterministic, such as findAny(), whether a stream executes sequentially or in parallel should not change the result of the computation.

AFAIK reduce() 是确定性的,(s1, s2) -> s1 + "/" + s2 是关联的,因此添加 parallel() 应该会产生相同的结果:

    System.out.println(Stream
            .of("a", "b", "c", "d", "e", "f")
            .parallel()
            .reduce("", (s1, s2) -> s1 + "/" + s2)
    );

然而我机器上的结果是:

/a//b//c//d//e//f

这是怎么回事?

顺便说一句:使用(首选).collect(Collectors.joining("/")) 而不是 reduce(...) 对顺序和并行执行产生相同的结果 a/b/c/d/e/f

JVM 详细信息:

java.specification.version: 1.8
java.version: 1.8.0_31
java.vm.version: 25.31-b07
java.runtime.version: 1.8.0_31-b13

来自 reduce 的文档:

The identity value must be an identity for the accumulator function. This means that for all t, accumulator.apply(identity, t) is equal to t.

您的情况不正确 - "" 并且 "a" 创建了 "/a"。

我提取了累加器函数并添加了一个打印输出来显示发生了什么:

BinaryOperator<String> accumulator = (s1, s2) -> {
    System.out.println("joining \"" + s1 + "\" and \"" + s2 + "\"");
    return s1 + "/" + s2;
};
System.out.println(Stream
                .of("a", "b", "c", "d", "e", "f")
                .parallel()
                .reduce("", accumulator)
);

这是示例输出(运行之间有所不同):

joining "" and "d"
joining "" and "f"
joining "" and "b"
joining "" and "a"
joining "" and "c"
joining "" and "e"
joining "/b" and "/c"
joining "/e" and "/f"
joining "/a" and "/b//c"
joining "/d" and "/e//f"
joining "/a//b//c" and "/d//e//f"
/a//b//c//d//e//f

您可以在函数中添加 if 语句来单独处理空字符串:

System.out.println(Stream
        .of("a", "b", "c", "d", "e", "f")
        .parallel()
        .reduce((s1, s2) -> s1.isEmpty()? s2 : s1 + "/" + s2)
);

正如 Marko Topolnik 所注意到的,不需要检查 s2,因为累加器不必是交换函数。

要添加到其他答案,

您可能想要使用可变缩减,文档指定做类似

的事情
String concatenated = strings.reduce("", String::concat)

会产生糟糕的性能结果。

We would get the desired result, and it would even work in parallel. However, we might not be happy about the performance! Such an implementation would do a great deal of string copying, and the run time would be O(n^2) in the number of characters. A more performant approach would be to accumulate the results into a StringBuilder, which is a mutable container for accumulating strings. We can use the same technique to parallelize mutable reduction as we do with ordinary reduction.

所以您应该改用 StringBuilder。

对于刚开始使用 lambda 和流的人来说,我花了很长时间才到达 "AHA" 时刻,直到我真正理解这里发生了什么。对于像我这样的流媒体新手,我会稍微改写一下,以便更容易(至少我希望它得到真正的回答)。

这一切都在 reduce 文档中指出:

标识值必须是累加器函数的标识。这意味着对于所有 t,accumulator.apply(identity, t) 等于 t.

我们可以很容易地证明代码的方式是,关联性被破坏了:

static private void isAssociative() {
     BinaryOperator<String> operator = (s1, s2) -> s1 + "/" + s2;
     String result = operator.apply("", "a");
     System.out.println(result); 
     System.out.println(result.equals("a")); 
}

一个空字符串与另一个字符串连接,应该真正产生第二个字符串;这不会发生,因此累加器 (BinaryOperator) 不是关联的,因此在并行调用的情况下,reduce 方法不能保证相同的结果。