为什么不推荐基于 AtomicInteger 的 Stream 解决方案?

Why AtomicInteger based Stream solutions are not recommended?

假设我有这份水果清单:-

List<String> f = Arrays.asList("Banana", "Apple", "Grape", "Orange", "Kiwi");

我需要为每个水果添加一个序列号并打印出来。水果或序列号的顺序无关紧要。所以这是一个有效的输出:-

4. Kiwi
3. Orange
1. Grape
2. Apple
5. Banana

解决方案 #1

AtomicInteger number = new AtomicInteger(0);

String result = f.parallelStream()
        .map(i -> String.format("%d. %s", number.incrementAndGet(), i))
        .collect(Collectors.joining("\n"));

解决方案#2

String result = IntStream.rangeClosed(1, f.size())
        .parallel()
        .mapToObj(i -> String.format("%d. %s", i, f.get(i - 1)))
        .collect(Collectors.joining("\n"));

问题

为什么解决方案 #1 是不好的做法?我在很多地方看到基于 AtomicInteger 的解决方案很糟糕(比如 ),特别是在并行流处理中(这就是我在上面使用并行流的原因,尝试 运行成问题)。

我看了这些 questions/answers:-
In which cases Stream operations should be stateful?

Java 8: Preferred way to count iterations of a lambda?

他们只是提到(除非我遗漏了什么)"unexpected results can occur"。像什么?在这个例子中会发生吗?如果没有,您能否提供一个可能发生的示例?

至于“不保证mapper函数的应用顺序”,好吧,这就是并行处理的本质,所以我接受了,并且此外,在此特定示例中,顺序无关紧要。

AtomicInteger是线程安全的,并行处理应该问题不大。

有人可以举例说明在使用这种基于状态的解决方案时会出现什么情况吗?

案例 2 - 在 API IntStream 的注释中 class returns 一个顺序有序的 IntStream 从 startInclusive(包含)到 endInclusive(包含)通过 1 种 for 的增量步骤循环因此并行流正在一个一个地处理它并提供正确的顺序。

 * @param startInclusive the (inclusive) initial value
 * @param endInclusive the inclusive upper bound
 * @return a sequential {@code IntStream} for the range of {@code int}
 *         elements
 */
public static IntStream rangeClosed(int startInclusive, int endInclusive) {

情况 1 - 显然列表将被并行处理,因此顺序不正确。由于映射操作是并行执行的,因此由于线程调度差异,相同输入的结果可能与 运行 运行 不同,因此不能保证对同一 "same" 元素的不同操作流管道在同一个线程中执行,也不能保证映射器函数如何也应用于流中的特定元素。

Source Java Doc

看看 Stuart Marks 的回答 here - 他正在使用有状态谓词。

有几个潜在的问题,但如果您不关心它们或真正理解它们 - 您应该没问题。

首先是顺序,在并行处理的当前实现下展示,但如果您不关心顺序,就像您的示例中那样,您就可以了。

第二个是潜在速度 AtomicInteger 递增一个简单的 int 会慢很多倍,如前所述,如果你关心这个。

第三个更微妙。有时根本无法保证 map 会被执行,例如因为 java-9:

 someStream.map(i -> /* do something with i and numbers */)
           .count();

这里的重点是因为你是在计数,所以不需要做映射,所以跳过了。一般来说,命中某些中间操作的元素不能保证到达终端。想象一下 map.filter.map 的情况,第一个地图可能 "see" 与第二个地图相比有更多的元素,因为某些元素可能被过滤掉了。所以不建议依赖这个,除非你能准确推理出这是怎么回事。

在你的例子中,IMO,你做你做的事是非常安全的;但是如果你稍微改变了你的代码,这需要额外的推理来证明它的正确性。我会选择解决方案 2,只是因为它对我来说更容易理解,而且它没有上面列出的潜在问题。

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from. The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

Package java.util.stream, Stateless behaviors

从线程安全性和正确性的角度来看,解决方案 1 没有任何问题。但是性能(作为并行处理的优势)可能会受到影响。


Why is solution #1 a bad practice?

我不会说这是不好的做法或不可接受的事情。为了性能,根本不推荐。

They just mention (unless I missed something) "unexpected results can occur". Like what?

"Unexpected results" 是一个非常宽泛的术语,通常指不正确的同步,类似 "What's the hell just happened?" 的行为。

Can it happen in this example?

事实并非如此。您可能不会 运行 陷入问题。

If not, can you provide me an example where it can happen?

AtomicInteger改成int*,把number.incrementAndGet()换成++number,你就有了


*盒装 int(例如,基于包装器、基于数组),因此您可以在 lambda

中使用它