如何实现 Java 8 Stream 流利 API 和惰性计算

How to implement a Java 8 Stream fluent API and lazily computed

我想找出一个等价于 Java 8 Stream 的简单实现,这将使我能够探索惰性计算查询算法的发展(例如 map()filter()reduce()、等等)。 注意:实现比 Stream 更好的解决方案并不是我的目标。另一方面,我唯一的目标是了解 Stream 的内部结构。

然而,我发现的每个实现都是基于 Iterable<T>,例如以下答案中提供的解决方案:

然而,我对这些解决方案中的任何一个都不满意,因为:

  1. 它们太冗长了。
  2. 它们对于新的查询方法不灵活。包含新的查询方法需要进行结构修改。
  3. 尽管有查询参数,但它们没有利用任何新的 Java 8 功能,例如:第一个 class 函数或默认方法。
  4. none 他们使用 Spliterator<T> 方法,Stream<T>.

我知道 Spliterator<T> 旨在允许分区和并行处理,但我认为其独特的迭代器方法 (boolean tryAdvance(Consumer<t>)) 可以被用于比上面列出的那些新的替代方法。此外,作为 :

Spliterator is a better Iterator, even without parallelism. (They're also generally just easier to write and harder to get wrong.)

那么,是否有可能开发一个更具可读性、更简单、简洁和灵活的查询实现 API 延迟计算并基于 Stream<T> 的相同原则(除了并行处理部分)?

如果是,你怎么做?我希望看到比上面列出的那些实现更简单的实现,并且如果可能的话利用新的 Java 8 功能。

要求:

我的问题的原因? 我认为学习查询的最佳方法 API 例如 Stream 是尝试自己实现这些相同的方法。我在学习.net Linq的时候就已经成功了。当然,我没有实现比 Linq 更好的实现,但这帮助我理解了内部部分。所以,我正在尝试按照相同的方法来学习 Stream。

这并不少见。有许多针对其他技术采用这种方法的研讨会,例如 functional-javascript-workshop,其中大多数练习要求实施现有方法,例如:map()filter()reduce() , call(), bind(), 等等...

已选答案:目前我考虑过 as my choice, instead of because the latter does not allow the implementaton of findAny() or findFirst() without completely traversing whole elements through the forEach() of dataSrc. However, I think that have other merits regarding the concise implementation of some intermediate operations and also on performance, since the forEach() based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by

使用函数式编程并利用 Java 8 个默认方法,我们可以实现 API 惰性计算查询的简短而干净的解决方案。例如,检查如何在下面的 Queryable 类型中轻松实现 map()forEach() 方法,然后您可以像这样使用它:

List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
     .map(String::length)
     .forEach(System.out::println);

如果您将 Queryable.of(dataSrc) 调用替换为 dataSrc.stream(),您将得到相同的结果。以下示例说明了 map()forEach() 方法的实现。在 Queryable repository.

查看完整的解决方案和更详细的描述

更新 @srborlongan 。将 forEach 签名从 forEach(Consumer<T>) 更改为 forEach(Consumer<? super T>),将 ofof(Collection<T>) 更改为 of(Iterable<T>)

@FunctionalInterface
public interface Queryable<T>{

  abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance

  static <T> boolean truth(Consumer<T> c, T item){
    c.accept(item);
    return true;
  }

  public static <T> Queryable<T> of(Iterable<T> data) {
    final Iterator<T> dataSrc = data.iterator();
    return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
  }

  public default void forEach(Consumer<? super T> action) {
    while (tryAdvance(action)) { }
  }

  public default <R> Queryable<R> map(Function<T, R> mapper) {
    return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
  }
}

在没有短路支持的情况下实现无状态操作的子集非常容易。你只需要注意始终坚持内部迭代。基本构建块是 forEach 操作,它可以对每个输入元素执行给定的操作。 forEach 方法的主体是唯一在不同阶段发生变化的东西。所以我们要么用抽象的forEach方法制作抽象的class,要么接受一个实际上是forEach的主体的函数。我会坚持使用第二种方法:

public final class MyStream<T> {
    private final Consumer<Consumer<T>> action;

    public MyStream(Consumer<Consumer<T>> action) {
        this.action = action;
    }

    public void forEach(Consumer<T> cons) {
        action.accept(cons);
    }
}

现在让我们创建一些简单的来源:

public static <T> MyStream<T> of(Iterable<T> elements) {
    // just redirect to Iterable::forEach
    return new MyStream<>(elements::forEach);
}

@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
    return of(Arrays.asList(elements));
}

public static MyStream<Integer> range(int from, int to) {
    return new MyStream<>(cons -> {
        for(int i=from; i<to; i++) cons.accept(i);
    });
}

现在是中间操作。他们只需要调整 action 收到的消费者来执行其他操作:

public <U> MyStream<U> map(Function<T, U> mapper) {
    return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}

public MyStream<T> filter(Predicate<T> pred) {
    return new MyStream<>(cons -> forEach(e -> {
        if(pred.test(e))
            cons.accept(e);
    }));
}

public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
    return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}

public MyStream<T> peek(Consumer<T> action) {
    return new MyStream<>(cons -> forEach(e -> {
        action.accept(e);
        cons.accept(e);
    }));
}

public MyStream<T> skip(long n) {
    return new MyStream<>(cons -> {
        long[] count = {0};
        forEach(e -> {
            if(++count[0] > n)
                cons.accept(e);
        });
    });
}

现在让我们使用 forEach:

创建一些终端操作
public T reduce(T identity, BinaryOperator<T> op) {
    class Box {
        T val = identity;
    }
    Box b = new Box();
    forEach(e -> b.val = op.apply(b.val, e));
    return b.val;
}

public Optional<T> reduce(BinaryOperator<T> op) {
    class Box {
        boolean isPresent;
        T val;
    }
    Box b = new Box();
    forEach(e -> {
        if(b.isPresent) b.val = op.apply(b.val, e);
        else {
            b.val = e;
            b.isPresent = true;
        }
    });
    return b.isPresent ? Optional.empty() : Optional.of(b.val);
}

public long count() {
    return map(e -> 1L).reduce(0L, Long::sum);
}

public Optional<T> maxBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.maxBy(cmp));
}

public Optional<T> minBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.minBy(cmp));
}

现在我们有了直播。让我们试试看:

System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
                           .reduce(0, Integer::sum));
// 30

System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
                           .flatMap(x -> MyStream.of(", ", x))
                           .skip(1).reduce("", String::concat));
// a, stream, of, some, strings

System.out.println(MyStream.range(0, 100)
                           .filter(x -> x % 3 == 0).count());
// 34

等等。这样的实现非常简单,但非常接近实际 Stream API 中发生的事情。当然,当你添加短路、并行流、原始特化和更多有状态操作时,事情会复杂得多。

请注意,与 Stream API 不同,此 MyStream 可以重复使用多次:

MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly

首先,我不得不说我喜欢 Lambdas 和 Stream APIs 的设计。 JDK 中的实现也很棒且性能很高。而且我不确定您自学 implementation/doing 的目的是否正确。但是我确实在我的开源库 AbacusUtil, both sequential and parallel. Here are the source codes at github: Stream 中实现了 Stream API。我不能说它有多好,比较 JDK 中的实现。但就个人而言,我认为实施非常简单明了。而且它的性能也很高。

披露:我是 AbacusUtil 的开发者。