有条件地向 Java 8 流添加操作

Conditionally add an operation to a Java 8 stream

我想知道是否可以根据流外设置的某种条件向流添加操作。例如,如果我的 limit 变量不等于 -1.

,我想向流添加限制操作

我的代码目前看起来像这样,但我还没有看到其他以这种方式使用流的示例,其中 Stream 对象被重新分配给应用于自身的中间操作的结果:

// Do some stream stuff
stream = stream.filter(e -> e.getTimestamp() < max);

// Limit the stream
if (limit != -1) {
   stream = stream.limit(limit);
}

// Collect stream to list
stream.collect(Collectors.toList());

如此 Whosebug post 中所述,在调用终端操作之前不会实际应用过滤器。由于我在调用终端操作之前重新分配流的值,上面的代码是否仍然是使用 Java 8 个流的正确方法?

我认为你的第一行应该是:

stream = stream.filter(e -> e.getTimestamp() < max);

以便您在后续操作中使用过滤器返回的流而不是原始流。

有两种方法可以做到这一点

// Do some stream stuff
List<E> results = list.stream()
                  .filter(e -> e.getTimestamp() < max);
                  .limit(limit > 0 ? limit : list.size())
                  .collect(Collectors.toList());

// Do some stream stuff
stream = stream.filter(e -> e.getTimestamp() < max);

// Limit the stream
if (limit != -1) {
   stream = stream.limit(limit);
}

// Collect stream to list
List<E> results = stream.collect(Collectors.toList());

因为这是函数式编程,所以您应该始终处理每个函数的结果。您应该特别避免在这种编程风格中修改任何内容,并尽可能将所有内容视为不可变的。

Since I'm reassigning the value of stream before a terminal operation is called, is the above code still a proper way to use Java 8 streams?

它应该可以工作,但它看起来像是命令式和函数式编码的混合体。我建议按照我的第一个答案将其写成固定流。

链式调用与存储中间 return 值的一系列调用之间没有语义差异。因此,以下代码片段是等价的:

a = object.foo();
b = a.bar();
c = b.baz();

c = object.foo().bar().baz();

在任何一种情况下,每个方法都是根据前一个调用的结果调用的。但在后一种情况下,中间结果不会被存储,而是会在下一次调用时丢失。在流 API 的情况下,中间结果 不能 在调用它的下一个方法后使用,因此链接是使用流的自然方式从本质上确保您不会在 returned 引用上调用多个方法。

不过,只要您遵守不多次使用 returned 引用的约定,存储对流的引用并没有错。通过按照您的问题使用它们的方式,即用下一次调用的结果覆盖变量,您还可以确保您不会在 returned 引用上调用多个方法,因此,这是正确的用法。当然,这仅适用于相同类型的中间结果,因此当您使用 mapflatMap 时,获取不同引用类型的流时,您无法覆盖局部变量。然后你必须注意不要再次使用旧的局部变量,但是,正如所说,只要你在下次调用后不使用它,中间存储就没有问题。

有时,您必须存储它,例如

try(Stream<String> stream = Files.lines(Paths.get("myFile.txt"))) {
    stream.filter(s -> !s.isEmpty()).forEach(System.out::println);
}

请注意,该代码等效于以下备选方案:

try(Stream<String> stream = Files.lines(Paths.get("myFile.txt")).filter(s->!s.isEmpty())) {
    stream.forEach(System.out::println);
}

try(Stream<String> srcStream = Files.lines(Paths.get("myFile.txt"))) {
    Stream<String> tmp = srcStream.filter(s -> !s.isEmpty());
    // must not be use variable srcStream here:
    tmp.forEach(System.out::println);
}

它们是等价的,因为 forEach 总是在 filter 的结果上调用,而 filter 总是在 Files.lines 的结果上调用,而最终的结果是什么并不重要close() 操作被调用,因为关闭会影响整个流管道。


一句话说,你的使用方式是正确的。


我什至 更喜欢 那样做,因为当你不想应用限制时不链接 limit 操作是最简洁的表达方式你的意图。还值得注意的是,建议的替代方案可能在很多情况下都有效,但它们 not 在语义上是等价的:

.limit(condition? aLimit: Long.MAX_VALUE)

假设您可能遇到的最大元素数是 Long.MAX_VALUE,但流可以包含比这更多的元素,它们甚至可能是无限的。

.limit(condition? aLimit: list.size())

当流源为 list 时,打破了流的惰性求值。原则上,可变流源可以合法地任意更改,直到终端操作开始为止。结果将反映到目前为止所做的所有修改。当您添加包含 list.size() 的中间操作时,即此时列表的实际大小,随后对该点和终端操作之间的集合应用的修改可能会使该值具有与预期“不同的含义”实际上没有限制”语义。

“Non Interference” section of the API documentation比较:

For well-behaved stream sources, the source can be modified before the terminal operation commences and those modifications will be reflected in the covered elements. For example, consider the following code:

List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));

First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list. Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected and joined together. Since the list was modified before the terminal collect operation commenced the result will be a string of "one two three".

当然,这是一种罕见的极端情况,因为通常情况下,程序员会在不修改其间的源集合的情况下制定整个流管道。尽管如此,不同的语义仍然存在,一旦你进入这样的极端情况,它可能会变成一个很难找到的错误。

此外,由于它们不等价,流 API 永远不会将这些值识别为“实际上没有限制”。即使指定 Long.MAX_VALUE 也意味着流实现必须跟踪已处理元素的数量以确保遵守限制。因此,不添加 limit 操作与添加程序员期望永远不会超过的数字的限制相比具有显着的性能优势。

我知道有点晚了,但我自己也有同样的问题,但没有找到令人满意的答案,然而,受这个问题和答案的启发,我得出了以下解决方案:

return Stream.of( ///< wrap target stream in other stream ;)
    /*do regular stream stuff*/ 
    stream.filter(e -> e.getTimestamp() < max)
  ).flatMap(s -> limit != -1 ? s.limit(limit) : s) ///< apply limit only if necessary and unwrap stream of stream to "normal" stream
  .collect(Collectors.toList()) ///< do final stuff