在一组文档上使用 ForkJoinPool

Using ForkJoinPool on a set of documents

我从未使用过 ForkJoinPool,我偶然发现了这段代码。

我有一个Set<Document> docs。文档有一个写方法。如果我执行以下操作,是否需要 get 或 join 以确保集合中的所有文档都已正确完成其写入方法?

ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);
pool.submit(() -> docs.parallelStream().forEach(
    doc -> {
        doc.write();
    })
);

如果其中一个文档无法完成写入会怎样?说它抛出异常。给出的代码是否等待所有文档完成写入操作?

ForkJoinPool.submit(Runnable) returns a ForkJoinTask representing the pending completion of the task. If you want to wait for all documents to be processed, you need some form of synchronization with that task, like calling its get() method(来自Future界面)。

关于异常处理,像往常一样,流处理过程中的任何异常都会停止它。但是你必须参考 Stream.forEach(Consumer):

的文档

The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. […]

这意味着如果发生异常,您无法保证将写入哪个文档。处理将停止,但您无法控制仍将处理哪个文档。

如果您想确保处理剩余的文档,我建议 2 个解决方案:

  • try/catch 包围 document.write() 以确保没有异常传播,但这使得很难检查哪个文档成功或是否有任何失败;或
  • 使用另一个解决方案来管理您的并行处理,例如 CompletableFuture API。正如评论中所指出的,您当前的解决方案是一种黑客技术,由于实施细节而有效,因此最好做一些更清洁的事情。

使用CompletableFuture,您可以按如下方式进行:

List<CompletableFuture<Void>> futures = docs.stream()
                    .map(doc -> CompletableFuture.runAsync(doc::write, pool))
                    .collect(Collectors.toList());

这将确保所有文档都得到处理,并检查返回列表中的每个未来是否成功。