流如何停止?
How do streams stop?
我想知道当我使用 Stream.generate
创建自己的无限流时,标准库中的流如何停止...
例如,当您有一个包含记录的列表时:
List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);
流不会是无限的并且运行永远,但是当遍历列表中的所有项目时它会停止。但它是如何工作的呢?相同的功能适用于 Files.lines(path)
创建的流(来源:http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/)。
第二个问题,如何以相同的方式停止使用 Stream.generate
创建的流?
有限流根本不是通过 Stream.generate
创建的。
实现流的标准方法是实现 Spliterator
, sometimes using the Iterator
detour。在任何一种情况下,实现都有一种方法来报告结束,例如当 Spliterator.tryAdvance
returns false
或其 forEachRemaining
方法只是 returns,或者在 Iterator
源的情况下,当 hasNext()
returns false
.
A Spliterator
甚至可以在处理开始之前报告预期的元素数量。
流,通过 Stream
接口内的工厂方法之一创建,如 Stream.generate
也可以通过 Spliterator
或使用流的内部功能来实现实现,但无论它们是如何实现的,您都无法通过这种实现来改变它们的行为,因此使这种流成为有限流的唯一方法是将 limit
操作链接到流。
如果您想创建一个不受数组或集合支持的非空有限流,并且 none 现有流源适合,您必须实现自己的 Spliterator
和create a stream out of it。如上所述,您可以使用现有的方法从 Iterator
中创建 Spliterator
,但是您应该抵制仅仅因为它很熟悉就使用 Iterator
的诱惑。一个Spliterator
不难实现:
/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
}, false);
}
从这个起点开始,您可以为 Spliterator
接口的 default
方法添加覆盖,加权开发费用和潜在的性能改进,例如
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
/** May improve the performance of most non-short-circuiting operations */
@Override
public void forEachRemaining(Consumer<? super T> action) {
long toGo=remaining;
remaining=0;
for(; toGo>0; toGo--) action.accept(s.get());
}
}, false);
}
我已经为此创建了一个通用的解决方法
public class GuardedSpliterator<T> implements Spliterator<T> {
final Supplier<? extends T> generator;
final Predicate<T> termination;
final boolean inclusive;
public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
this.generator = generator;
this.termination = termination;
this.inclusive = inclusive;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T next = generator.get();
boolean end = termination.test(next);
if (inclusive || !end) {
action.accept(next);
}
return !end;
}
@Override
public Spliterator<T> trySplit() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long estimateSize() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int characteristics() {
return Spliterator.ORDERED;
}
}
用法很简单:
GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
() -> rnd.nextInt(),
(i) -> i > 10,
true
);
Stream<Integer> ints = StreamSupport.stream(source, false);
ints.forEach(i -> System.out.println(i));
我想知道当我使用 Stream.generate
创建自己的无限流时,标准库中的流如何停止...
例如,当您有一个包含记录的列表时:
List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);
流不会是无限的并且运行永远,但是当遍历列表中的所有项目时它会停止。但它是如何工作的呢?相同的功能适用于 Files.lines(path)
创建的流(来源:http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/)。
第二个问题,如何以相同的方式停止使用 Stream.generate
创建的流?
有限流根本不是通过 Stream.generate
创建的。
实现流的标准方法是实现 Spliterator
, sometimes using the Iterator
detour。在任何一种情况下,实现都有一种方法来报告结束,例如当 Spliterator.tryAdvance
returns false
或其 forEachRemaining
方法只是 returns,或者在 Iterator
源的情况下,当 hasNext()
returns false
.
A Spliterator
甚至可以在处理开始之前报告预期的元素数量。
流,通过 Stream
接口内的工厂方法之一创建,如 Stream.generate
也可以通过 Spliterator
或使用流的内部功能来实现实现,但无论它们是如何实现的,您都无法通过这种实现来改变它们的行为,因此使这种流成为有限流的唯一方法是将 limit
操作链接到流。
如果您想创建一个不受数组或集合支持的非空有限流,并且 none 现有流源适合,您必须实现自己的 Spliterator
和create a stream out of it。如上所述,您可以使用现有的方法从 Iterator
中创建 Spliterator
,但是您应该抵制仅仅因为它很熟悉就使用 Iterator
的诱惑。一个Spliterator
不难实现:
/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
}, false);
}
从这个起点开始,您可以为 Spliterator
接口的 default
方法添加覆盖,加权开发费用和潜在的性能改进,例如
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
/** May improve the performance of most non-short-circuiting operations */
@Override
public void forEachRemaining(Consumer<? super T> action) {
long toGo=remaining;
remaining=0;
for(; toGo>0; toGo--) action.accept(s.get());
}
}, false);
}
我已经为此创建了一个通用的解决方法
public class GuardedSpliterator<T> implements Spliterator<T> {
final Supplier<? extends T> generator;
final Predicate<T> termination;
final boolean inclusive;
public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
this.generator = generator;
this.termination = termination;
this.inclusive = inclusive;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T next = generator.get();
boolean end = termination.test(next);
if (inclusive || !end) {
action.accept(next);
}
return !end;
}
@Override
public Spliterator<T> trySplit() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long estimateSize() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int characteristics() {
return Spliterator.ORDERED;
}
}
用法很简单:
GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
() -> rnd.nextInt(),
(i) -> i > 10,
true
);
Stream<Integer> ints = StreamSupport.stream(source, false);
ints.forEach(i -> System.out.println(i));