如何实现 Java 8 Stream 流利 API 和惰性计算
How to implement a Java 8 Stream fluent API and lazily computed
我想找出一个等价于 Java 8 Stream 的简单实现,这将使我能够探索惰性计算查询算法的发展(例如 map()
、filter()
、reduce()
、等等)。 注意:实现比 Stream 更好的解决方案并不是我的目标。另一方面,我唯一的目标是了解 Stream 的内部结构。
然而,我发现的每个实现都是基于 Iterable<T>
,例如以下答案中提供的解决方案:
然而,我对这些解决方案中的任何一个都不满意,因为:
- 它们太冗长了。
- 它们对于新的查询方法不灵活。包含新的查询方法需要进行结构修改。
- 尽管有查询参数,但它们没有利用任何新的 Java 8 功能,例如:第一个 class 函数或默认方法。
- none 他们使用
Spliterator<T>
方法,Stream<T>
.
我知道 Spliterator<T>
旨在允许分区和并行处理,但我认为其独特的迭代器方法 (boolean tryAdvance(Consumer<t>)
) 可以被用于比上面列出的那些新的替代方法。此外,作为 :
Spliterator
is a better Iterator
, even without parallelism. (They're also generally just easier to write and harder to get wrong.)
那么,是否有可能开发一个更具可读性、更简单、简洁和灵活的查询实现 API 延迟计算并基于 Stream<T>
的相同原则(除了并行处理部分)?
如果是,你怎么做?我希望看到比上面列出的那些实现更简单的实现,并且如果可能的话利用新的 Java 8 功能。
要求:
- 不要重用 Java 8 API
中的现有方法
- 并行处理功能超出了这个问题的范围。
- 如果可能,最好不要使用
Iterable<T>
方法。
我的问题的原因? 我认为学习查询的最佳方法 API 例如 Stream 是尝试自己实现这些相同的方法。我在学习.net Linq的时候就已经成功了。当然,我没有实现比 Linq 更好的实现,但这帮助我理解了内部部分。所以,我正在尝试按照相同的方法来学习 Stream。
这并不少见。有许多针对其他技术采用这种方法的研讨会,例如 functional-javascript-workshop,其中大多数练习要求实施现有方法,例如:map()
、filter()
、reduce()
, call()
, bind()
, 等等...
已选答案:目前我考虑过 as my choice, instead of because the latter does not allow the implementaton of findAny()
or findFirst()
without completely traversing whole elements through the forEach()
of dataSrc
. However, I think that have other merits regarding the concise implementation of some intermediate operations and also on performance, since the forEach()
based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by
使用函数式编程并利用 Java 8 个默认方法,我们可以实现 API 惰性计算查询的简短而干净的解决方案。例如,检查如何在下面的 Queryable
类型中轻松实现 map()
和 forEach()
方法,然后您可以像这样使用它:
List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
.map(String::length)
.forEach(System.out::println);
如果您将 Queryable.of(dataSrc)
调用替换为 dataSrc.stream()
,您将得到相同的结果。以下示例说明了 map()
和 forEach()
方法的实现。在 Queryable repository.
查看完整的解决方案和更详细的描述
更新 @srborlongan 。将 forEach
签名从 forEach(Consumer<T>)
更改为 forEach(Consumer<? super T>)
,将 of
从 of(Collection<T>)
更改为 of(Iterable<T>)
@FunctionalInterface
public interface Queryable<T>{
abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance
static <T> boolean truth(Consumer<T> c, T item){
c.accept(item);
return true;
}
public static <T> Queryable<T> of(Iterable<T> data) {
final Iterator<T> dataSrc = data.iterator();
return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
}
public default void forEach(Consumer<? super T> action) {
while (tryAdvance(action)) { }
}
public default <R> Queryable<R> map(Function<T, R> mapper) {
return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
}
}
在没有短路支持的情况下实现无状态操作的子集非常容易。你只需要注意始终坚持内部迭代。基本构建块是 forEach
操作,它可以对每个输入元素执行给定的操作。 forEach
方法的主体是唯一在不同阶段发生变化的东西。所以我们要么用抽象的forEach
方法制作抽象的class,要么接受一个实际上是forEach
的主体的函数。我会坚持使用第二种方法:
public final class MyStream<T> {
private final Consumer<Consumer<T>> action;
public MyStream(Consumer<Consumer<T>> action) {
this.action = action;
}
public void forEach(Consumer<T> cons) {
action.accept(cons);
}
}
现在让我们创建一些简单的来源:
public static <T> MyStream<T> of(Iterable<T> elements) {
// just redirect to Iterable::forEach
return new MyStream<>(elements::forEach);
}
@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
return of(Arrays.asList(elements));
}
public static MyStream<Integer> range(int from, int to) {
return new MyStream<>(cons -> {
for(int i=from; i<to; i++) cons.accept(i);
});
}
现在是中间操作。他们只需要调整 action
收到的消费者来执行其他操作:
public <U> MyStream<U> map(Function<T, U> mapper) {
return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}
public MyStream<T> filter(Predicate<T> pred) {
return new MyStream<>(cons -> forEach(e -> {
if(pred.test(e))
cons.accept(e);
}));
}
public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}
public MyStream<T> peek(Consumer<T> action) {
return new MyStream<>(cons -> forEach(e -> {
action.accept(e);
cons.accept(e);
}));
}
public MyStream<T> skip(long n) {
return new MyStream<>(cons -> {
long[] count = {0};
forEach(e -> {
if(++count[0] > n)
cons.accept(e);
});
});
}
现在让我们使用 forEach
:
创建一些终端操作
public T reduce(T identity, BinaryOperator<T> op) {
class Box {
T val = identity;
}
Box b = new Box();
forEach(e -> b.val = op.apply(b.val, e));
return b.val;
}
public Optional<T> reduce(BinaryOperator<T> op) {
class Box {
boolean isPresent;
T val;
}
Box b = new Box();
forEach(e -> {
if(b.isPresent) b.val = op.apply(b.val, e);
else {
b.val = e;
b.isPresent = true;
}
});
return b.isPresent ? Optional.empty() : Optional.of(b.val);
}
public long count() {
return map(e -> 1L).reduce(0L, Long::sum);
}
public Optional<T> maxBy(Comparator<T> cmp) {
return reduce(BinaryOperator.maxBy(cmp));
}
public Optional<T> minBy(Comparator<T> cmp) {
return reduce(BinaryOperator.minBy(cmp));
}
现在我们有了直播。让我们试试看:
System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
.reduce(0, Integer::sum));
// 30
System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
.flatMap(x -> MyStream.of(", ", x))
.skip(1).reduce("", String::concat));
// a, stream, of, some, strings
System.out.println(MyStream.range(0, 100)
.filter(x -> x % 3 == 0).count());
// 34
等等。这样的实现非常简单,但非常接近实际 Stream API 中发生的事情。当然,当你添加短路、并行流、原始特化和更多有状态操作时,事情会复杂得多。
请注意,与 Stream API 不同,此 MyStream
可以重复使用多次:
MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly
首先,我不得不说我喜欢 Lambdas 和 Stream APIs 的设计。 JDK 中的实现也很棒且性能很高。而且我不确定您自学 implementation/doing 的目的是否正确。但是我确实在我的开源库 AbacusUtil, both sequential and parallel. Here are the source codes at github: Stream 中实现了 Stream API。我不能说它有多好,比较 JDK 中的实现。但就个人而言,我认为实施非常简单明了。而且它的性能也很高。
披露:我是 AbacusUtil 的开发者。
我想找出一个等价于 Java 8 Stream 的简单实现,这将使我能够探索惰性计算查询算法的发展(例如 map()
、filter()
、reduce()
、等等)。 注意:实现比 Stream 更好的解决方案并不是我的目标。另一方面,我唯一的目标是了解 Stream 的内部结构。
然而,我发现的每个实现都是基于 Iterable<T>
,例如以下答案中提供的解决方案:
然而,我对这些解决方案中的任何一个都不满意,因为:
- 它们太冗长了。
- 它们对于新的查询方法不灵活。包含新的查询方法需要进行结构修改。
- 尽管有查询参数,但它们没有利用任何新的 Java 8 功能,例如:第一个 class 函数或默认方法。
- none 他们使用
Spliterator<T>
方法,Stream<T>
.
我知道 Spliterator<T>
旨在允许分区和并行处理,但我认为其独特的迭代器方法 (boolean tryAdvance(Consumer<t>)
) 可以被用于比上面列出的那些新的替代方法。此外,作为
Spliterator
is a betterIterator
, even without parallelism. (They're also generally just easier to write and harder to get wrong.)
那么,是否有可能开发一个更具可读性、更简单、简洁和灵活的查询实现 API 延迟计算并基于 Stream<T>
的相同原则(除了并行处理部分)?
如果是,你怎么做?我希望看到比上面列出的那些实现更简单的实现,并且如果可能的话利用新的 Java 8 功能。
要求:
- 不要重用 Java 8 API 中的现有方法
- 并行处理功能超出了这个问题的范围。
- 如果可能,最好不要使用
Iterable<T>
方法。
我的问题的原因? 我认为学习查询的最佳方法 API 例如 Stream 是尝试自己实现这些相同的方法。我在学习.net Linq的时候就已经成功了。当然,我没有实现比 Linq 更好的实现,但这帮助我理解了内部部分。所以,我正在尝试按照相同的方法来学习 Stream。
这并不少见。有许多针对其他技术采用这种方法的研讨会,例如 functional-javascript-workshop,其中大多数练习要求实施现有方法,例如:map()
、filter()
、reduce()
, call()
, bind()
, 等等...
已选答案:目前我考虑过findAny()
or findFirst()
without completely traversing whole elements through the forEach()
of dataSrc
. However, I think that forEach()
based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by
使用函数式编程并利用 Java 8 个默认方法,我们可以实现 API 惰性计算查询的简短而干净的解决方案。例如,检查如何在下面的 Queryable
类型中轻松实现 map()
和 forEach()
方法,然后您可以像这样使用它:
List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
.map(String::length)
.forEach(System.out::println);
如果您将 Queryable.of(dataSrc)
调用替换为 dataSrc.stream()
,您将得到相同的结果。以下示例说明了 map()
和 forEach()
方法的实现。在 Queryable repository.
更新 @srborlongan forEach
签名从 forEach(Consumer<T>)
更改为 forEach(Consumer<? super T>)
,将 of
从 of(Collection<T>)
更改为 of(Iterable<T>)
@FunctionalInterface
public interface Queryable<T>{
abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance
static <T> boolean truth(Consumer<T> c, T item){
c.accept(item);
return true;
}
public static <T> Queryable<T> of(Iterable<T> data) {
final Iterator<T> dataSrc = data.iterator();
return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
}
public default void forEach(Consumer<? super T> action) {
while (tryAdvance(action)) { }
}
public default <R> Queryable<R> map(Function<T, R> mapper) {
return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
}
}
在没有短路支持的情况下实现无状态操作的子集非常容易。你只需要注意始终坚持内部迭代。基本构建块是 forEach
操作,它可以对每个输入元素执行给定的操作。 forEach
方法的主体是唯一在不同阶段发生变化的东西。所以我们要么用抽象的forEach
方法制作抽象的class,要么接受一个实际上是forEach
的主体的函数。我会坚持使用第二种方法:
public final class MyStream<T> {
private final Consumer<Consumer<T>> action;
public MyStream(Consumer<Consumer<T>> action) {
this.action = action;
}
public void forEach(Consumer<T> cons) {
action.accept(cons);
}
}
现在让我们创建一些简单的来源:
public static <T> MyStream<T> of(Iterable<T> elements) {
// just redirect to Iterable::forEach
return new MyStream<>(elements::forEach);
}
@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
return of(Arrays.asList(elements));
}
public static MyStream<Integer> range(int from, int to) {
return new MyStream<>(cons -> {
for(int i=from; i<to; i++) cons.accept(i);
});
}
现在是中间操作。他们只需要调整 action
收到的消费者来执行其他操作:
public <U> MyStream<U> map(Function<T, U> mapper) {
return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}
public MyStream<T> filter(Predicate<T> pred) {
return new MyStream<>(cons -> forEach(e -> {
if(pred.test(e))
cons.accept(e);
}));
}
public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}
public MyStream<T> peek(Consumer<T> action) {
return new MyStream<>(cons -> forEach(e -> {
action.accept(e);
cons.accept(e);
}));
}
public MyStream<T> skip(long n) {
return new MyStream<>(cons -> {
long[] count = {0};
forEach(e -> {
if(++count[0] > n)
cons.accept(e);
});
});
}
现在让我们使用 forEach
:
public T reduce(T identity, BinaryOperator<T> op) {
class Box {
T val = identity;
}
Box b = new Box();
forEach(e -> b.val = op.apply(b.val, e));
return b.val;
}
public Optional<T> reduce(BinaryOperator<T> op) {
class Box {
boolean isPresent;
T val;
}
Box b = new Box();
forEach(e -> {
if(b.isPresent) b.val = op.apply(b.val, e);
else {
b.val = e;
b.isPresent = true;
}
});
return b.isPresent ? Optional.empty() : Optional.of(b.val);
}
public long count() {
return map(e -> 1L).reduce(0L, Long::sum);
}
public Optional<T> maxBy(Comparator<T> cmp) {
return reduce(BinaryOperator.maxBy(cmp));
}
public Optional<T> minBy(Comparator<T> cmp) {
return reduce(BinaryOperator.minBy(cmp));
}
现在我们有了直播。让我们试试看:
System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
.reduce(0, Integer::sum));
// 30
System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
.flatMap(x -> MyStream.of(", ", x))
.skip(1).reduce("", String::concat));
// a, stream, of, some, strings
System.out.println(MyStream.range(0, 100)
.filter(x -> x % 3 == 0).count());
// 34
等等。这样的实现非常简单,但非常接近实际 Stream API 中发生的事情。当然,当你添加短路、并行流、原始特化和更多有状态操作时,事情会复杂得多。
请注意,与 Stream API 不同,此 MyStream
可以重复使用多次:
MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly
首先,我不得不说我喜欢 Lambdas 和 Stream APIs 的设计。 JDK 中的实现也很棒且性能很高。而且我不确定您自学 implementation/doing 的目的是否正确。但是我确实在我的开源库 AbacusUtil, both sequential and parallel. Here are the source codes at github: Stream 中实现了 Stream API。我不能说它有多好,比较 JDK 中的实现。但就个人而言,我认为实施非常简单明了。而且它的性能也很高。
披露:我是 AbacusUtil 的开发者。