Java 8 Stream 抛出 RuntimeException 时的预期行为是什么?

What is the expected behavior when a Java 8 Stream throw a RuntimeException?

流处理中遇到RuntimeException,流处理是否应该中止?它应该先完成吗?是否应该在 Stream.close() 上重新抛出异常?异常是按原样重新抛出还是被包装了? Stream and package java.util.stream的JavaDoc对此无话可说。

我发现的关于 Whosebug 的所有问题似乎都集中在如何从功能接口中包装已检查的异常以使其代码编译。事实上,互联网上的博客文章和类似文章都关注相同的警告。这不是我关心的。

根据我自己的经验,我知道 顺序 流的处理将在抛出 RuntimeException 后立即中止,并且此异常将按原样重新抛出。这与并行流相同仅当客户端线程抛出异常时。

但是,示例代码 put here 表明,如果在并行流处理期间异常是由 "worker thread"(= 与调用终端操作的线程不同的线程)引发的,则此异常将永远丢失,流处理完成。

示例代码将首先 运行 和 IntStream in parallel. Then a "normal" Stream 并行。

示例将表明,

1) IntStream 在遇到 RuntimeException 时中止并行处理没有问题。异常被重新抛出,包裹在另一个 RuntimeException 中。

2) Stream 玩起来没那么好。事实上,客户端线程永远不会看到抛出 RuntimeException 的踪迹。流不仅完成处理; 将处理比 limit() 指定的更多 个元素!

在示例代码中,IntStream 是使用 IntStream.range(). The "normal" Stream has no notion of a "range" and is instead made up of 1:s, but Stream.limit() 生成的,调用时将流限制为十亿个元素。

这是另一个转折点。生成 IntStream 的示例代码执行如下操作:

IntStream.range(0, 1_000_000_000).parallel().forEach(..)

将其更改为生成的流,就像代码中的第二个示例一样:

IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)

此 IntStream 的结果是相同的:异常被包装并重新抛出,处理中止。但是,第二个流现在也将包装并重新抛出异常,并且不会处理超过限制的元素!因此: 改变第一个流的产生方式会对第二个流的行为产生副作用。对我来说,这很奇怪。

ForkJoinPool.invoke() and ForkJoinTask 的 JavaDoc 说异常被重新抛出,这是我对并行流的期望。

背景

我在处理取自 Collection.stream().parallel() 的并行流中的元素时遇到了这个 "problem"(我还没有验证 Collection.parallelStream() 的行为,但它应该是相同的)。发生的事情是 "worker thread" 崩溃,然后在所有其他线程成功完成流时静静地离开。我的应用程序使用 default exception handler 将异常写入日志文件。但甚至没有创建此日志文件。线程和他的异常就这样消失了。由于我需要在捕获到 运行time 异常后立即中止,因此一种替代方法是编写将此异常泄漏给其他工作人员的代码,使他们在任何其他线程抛出异常时不愿继续进行。当然,这并不能保证流实现只是继续产生新的线程来尝试完成流。所以我可能最终不会使用并行流,而是使用线程 pool/executor.

进行 "normal" 并发编程

这表明丢失 运行 时间异常的问题并不孤立于由 Stream.generate() 生成的流或使用 Stream.limit() 的流。最重要的是,我很想知道预期的行为是什么?

这两个流在异常报告方面的行为没有区别,问题是您将两个测试一个接一个地放入一个方法中,并让它们访问共享数据结构。

有一个微妙的、可能没有充分记录(如果有意)的行为:当流操作异常完成时,它不会等待所有并发操作完成。

因此,当您捕捉到第一个流操作的异常时,仍有一些线程 运行正在访问您的共享数据。因此,当您重置 AtomicBoolean 时,属于第一个作业的其中一个线程将读取 false 值,将其转换为 true,打印消息并抛出丢失的异常,因为流操作已经异常完成。此外,其中一些线程会在您重置 后增加您的计数器 ,这就是为什么它的数字高于第二个作业允许的数字。您的第二份工作没有异常完成,因为属于第二份工作的所有线程将从 AtomicBoolean.

中读取 true

有一些方法可以发现这一点。

当您删除第一个流操作时,第二个将按预期异常完成。另外,插入语句

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

在两个流操作之间将解决问题,因为它等待所有线程完成。

但是,更简洁的解决方案是让两个流操作都使用自己的计数器和标志。

就是说,如果您只是交换这两个操作,则有一个细微的、依赖于实现的差异会导致问题消失。 IntStream.range 操作生成一个已知大小的流,允许将其拆分为本质上知道要处理多少元素的并发任务。这允许在上述异常情况下放弃这些任务。另一方面,将 generate 返回的无限流与 limit 组合不会产生一定大小的流(尽管这是可能的)。由于这样的流被视为具有未知大小,因此子任务必须在计数器上同步以确保遵守限制。这会导致子任务(有时)完成,即使在特殊情况下也是如此。但如前所述,这是实现细节的副作用,而不是有意等待完成。由于它是关于并发的,如果你 运行 多次,结果可能会有所不同。