流如何停止?

How do streams stop?

我想知道当我使用 Stream.generate 创建自己的无限流时,标准库中的流如何停止...

例如,当您有一个包含记录的列表时:

List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);

流不会是无限的并且运行永远,但是当遍历列表中的所有项目时它会停止。但它是如何工作的呢?相同的功能适用于 Files.lines(path) 创建的流(来源:http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/)。

第二个问题,如何以相同的方式停止使用 Stream.generate 创建的流?

有限流根本不是通过 Stream.generate 创建的。

实现流的标准方法是实现 Spliterator, sometimes using the Iterator detour。在任何一种情况下,实现都有一种方法来报告结束,例如当 Spliterator.tryAdvance returns false 或其 forEachRemaining 方法只是 returns,或者在 Iterator 源的情况下,当 hasNext() returns false.

A Spliterator 甚至可以在处理开始之前报告预期的元素数量。

流,通过 Stream 接口内的工厂方法之一创建,如 Stream.generate 也可以通过 Spliterator 或使用流的内部功能来实现实现,但无论它们是如何实现的,您都无法通过这种实现来改变它们的行为,因此使这种流成为有限流的唯一方法是将 limit 操作链接到流。

如果您想创建一个不受数组或集合支持的非空有限流,并且 none 现有流源适合,您必须实现自己的 Spliteratorcreate a stream out of it。如上所述,您可以使用现有的方法从 Iterator 中创建 Spliterator,但是您应该抵制仅仅因为它很熟悉就使用 Iterator 的诱惑。一个Spliterator不难实现:

/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }
    }, false);
}

从这个起点开始,您可以为 Spliterator 接口的 default 方法添加覆盖,加权开发费用和潜在的性能改进,例如

static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }

        /** May improve the performance of most non-short-circuiting operations */
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            long toGo=remaining;
            remaining=0;
            for(; toGo>0; toGo--) action.accept(s.get());
        }
    }, false);
}

我已经为此创建了一个通用的解决方法

public class GuardedSpliterator<T> implements Spliterator<T> {

  final Supplier<? extends T> generator;

  final Predicate<T> termination;

  final boolean inclusive;

  public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
    this.generator = generator;
    this.termination = termination;
    this.inclusive = inclusive;
  }

  @Override
  public boolean tryAdvance(Consumer<? super T> action) {
    T next = generator.get(); 
    boolean end = termination.test(next);
    if (inclusive || !end) {
      action.accept(next);
    }
    return !end;
  }

  @Override
  public Spliterator<T> trySplit() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public long estimateSize() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public int characteristics() {
    return Spliterator.ORDERED;
  }

}

用法很简单:

GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
    ()  -> rnd.nextInt(),
    (i) -> i > 10,
    true
);

Stream<Integer> ints = StreamSupport.stream(source, false);

ints.forEach(i -> System.out.println(i));