在并行流上调用顺序使所有先前的操作顺序

Calling sequential on parallel stream makes all previous operations sequential

我有一组重要的数据,我想调用缓慢但干净的方法,而不是调用对第一个方法的结果有副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。

明显的解决方案是创建并行流,进行慢速调用,再次使流顺序化,然后进行快速调用。问题是,所有代码都在单线程中执行,没有实际的并行性。

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}

如果我删除 sequential,代码将按预期执行,但显然,非并行操作将在多个线程中调用。

你能推荐一些关于这种行为的参考资料,或者一些避免临时收集的方法吗?

在当前的实现中,Stream 要么全部并行,要么全部顺序。虽然 Javadoc 没有明确说明这一点并且将来可能会发生变化,但它确实表示这是可能的。

S parallel()

Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel.

如果你需要函数是单线程的,我建议你使用锁或同步block/method。

将流从 parallel() 切换到 sequential() 在最初的 Stream API 设计中可行,但引起了很多问题,最终实现是 changed, so it just turns the parallel flag on and off for the whole pipeline. The current documentation is indeed vague, but it was improved in Java-9:

The stream pipeline is executed sequentially or in parallel depending on the mode of the stream on which the terminal operation is invoked. The sequential or parallel mode of a stream can be determined with the BaseStream.isParallel() method, and the stream's mode can be modified with the BaseStream.sequential() and BaseStream.parallel() operations. The most recent sequential or parallel mode setting applies to the execution of the entire stream pipeline.

至于您的问题,您可以将所有内容收集到中间 List 并启动新的顺序管道:

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());