在流管道中保存到数据库

Saving to database in stream pipeline

根据the documentation on Oracle's website

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.

这是否包括将流的元素保存到数据库?

想象以下(伪)代码:

public SavedCar saveCar(Car car) {
  SavedCar savedCar = this.getDb().save(car);
  return savedCar;
}

public List<SavedCars> saveCars(List<Car> cars) {
  return cars.stream()
           .map(this::saveCar)
           .collect(Collectors.toList());
}

反对此实施的不利影响是什么:

public SavedCar saveCar(Car car) {
  SavedCar savedCar = this.getDb().save(car);
  return savedCar;
}

public List<SavedCars> saveCars(List<Car> cars) {
  List<SavedCars> savedCars = new ArrayList<>();
  for (Cat car : cars) {
    savedCars.add(this.saveCar(car));
  }
  return savedCars.
}

绝对最简单的示例是:

cars.stream()
    .map(this:saveCar)
    .count()

在这种情况下,从java-9开始,map将不会被执行;因为您根本不需要它来了解 count

还有其他多种情况,副作用会给您带来很多痛苦;在某些条件下。

As per the documentation on Oracle's website [...]

link 用于 Java8。您可能需要阅读 Java9(2017 年发布)及更高版本的文档,因为它们更明确对此。具体来说:

A stream implementation is permitted significant latitude in optimizing the computation of the result. For example, a stream implementation is free to elide operations (or entire stages) from a stream pipeline -- and therefore elide invocation of behavioral parameters -- if it can prove that it would not affect the result of the computation. This means that side-effects of behavioral parameters may not always be executed and should not be relied upon, unless otherwise specified (such as by the terminal operations forEach and forEachOrdered). (For a specific example of such an optimization, see the API note documented on the count() operation. For more detail, see the side-effects section of the stream package documentation.)

Source: Java 9's Javadoc for the Stream interface.

还有您引用的文档的更新版本:

Side-effects

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.
If the behavioral parameters do have side-effects, unless explicitly stated, there are no guarantees as to:

  • the visibility of those side-effects to other threads;
  • that different operations on the "same" element within the same stream pipeline are executed in the same thread; and
  • that behavioral parameters are always invoked, since a stream implementation is free to elide operations (or entire stages) from a stream pipeline if it can prove that it would not affect the result of the computation.

The ordering of side-effects may be surprising. Even when a pipeline is constrained to produce a result that is consistent with the encounter order of the stream source (for example, IntStream.range(0,5).parallel().map(x -> x*2).toArray() must produce [0, 2, 4, 6, 8]), no guarantees are made as to the order in which the mapper function is applied to individual elements, or in what thread any behavioral parameter is executed for a given element.

The eliding of side-effects may also be surprising. With the exception of terminal operations forEach and forEachOrdered, side-effects of behavioral parameters may not always be executed when the stream implementation can optimize away the execution of behavioral parameters without affecting the result of the computation. (For a specific example see the API note documented on the count operation.)

Source: Java 9's Javadoc for the java.util.stream package.

所有重点都是我的。

如您所见,current 官方文档更详细地介绍了如果您决定在流操作中使用副作用可能会遇到的问题。 forEachforEachOrdered 也是唯一可以保证执行副作用的终端操作(请注意,线程安全问题仍然存在,如官方示例所示)。


话虽这么说,但关于您的具体代码,仅表示代码:

public List<SavedCars> saveCars(List<Car> cars) {
  return cars.stream()
           .map(this::saveCar)
           .collect(Collectors.toList());
}

我没有发现上述代码存在与 Streams 相关的问题。

  • .map()步骤将被执行,因为.collect()(一个mutable reduction操作,这是官方文档推荐的而不是.forEach(list::add)之类的操作)依赖于.map() 的输出,并且由于此(即 saveCar() 的)输出与其输入不同,因此流不能 "prove that [eliding] it would not affect the result of the computation".
  • 它不是 parallelStream() 所以它不应该引入任何以前不存在的并发问题(当然,如果有人后来添加了 .parallel() 那么问题可能会出现 - 就像如果有人决定通过为内部计算启动新线程来并行化 for 循环。

这并不意味着该示例中的代码是 Good Code™。序列 .stream.map(::someSideEffect()).collect() 作为对集合中的每个项目执行副作用操作的一种方式可能看起来更简单/简短/优雅?比它的 for 对应物,有时可能是。然而,正如 Eugene、Holger 和其他一些人告诉您的那样,有更好的方法来解决这个问题。
快速思考一下:启动一个 Stream 与迭代一个简单的 for 的成本是不可忽略的,除非你有 lot 的项目,并且如果你有很多 项目然后你: a) 可能不想为每个项目创建一个新的数据库访问,所以 saveAll(List items) API 会更好;并且 b) 可能不想承受按顺序处理 lot 项目的性能损失,因此您最终会使用并行化,然后出现一系列全新的问题。