Java 8: 停止缩减操作检查所有 Stream 元素

Java 8: stop reduction operation from examining all Stream elements

我想知道是否有一种方法可以在不检查整个流的情况下终止缩减操作,但我想不出办法。

用例大致如下:假设有一长串Integer需要折叠成一个Accumulator。每个元素检查都可能很昂贵,因此在 Accumulator 中,我对传入的 Accumulator 执行检查,看看我们是否需要执行昂贵的操作 - 如果我们不需要,那么我只是 return 累加器。

对于小型(呃)列表来说,这显然是一个很好的解决方案,但大型列表会产生不必要的流元素访问成本,我想避免。

这是一个代码草图 - 假设仅进行串行缩减。

class Accumulator {
    private final Set<A> setA = new HashSet<>;
    private final Set<B> setB = new HashSet<>;
}

class ResultSupplier implements Supplier<Result> {

    private final List<Integer> ids;

    @Override
    public Result get() {
        Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);

        return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
    }

    private static BiFunction<Accumulator, Integer, Accumulator> f() {
        return (acc, element) -> {
            if (acc.setA.size() <= 1) {
                // perform expensive ops and accumulate results
            }
            return acc;
        };
    }
}

除了必须遍历整个Stream之外,还有一个我不喜欢的事实——我必须检查相同的条件两次(即setA 尺码检查)。

我考虑过 map()collect() 操作,但它们看起来更像是相同的,并且没有发现它们实质上改变了我无法完成折叠操作的事实检查整个流。

此外,我的想法是虚构的 takeWhile(p : (A) => boolean) 流 API 通讯员也不会给我们带来任何好处,因为终止条件取决于累加器,而不是流元素本身。

请记住,我是 FP 的新手,所以 - 有没有办法让这项工作如我所料?是我设置不正确还是这个限制是设计使然?

当然,会有一个有趣的、纯粹的 FP 答案,可能有助于按照您的预期方式解决这个问题。

与此同时,当简单的解决方案在实用上势在必行并且您的原始数据源是 List 并且已经完全实现时,为什么还要使用 FP,并且您将使用串行缩减,而不是并行减少。改为这样写:

@Override
public Result get() {
    Accumulator acc = new Accumulator();

    for (Integer id : ids) {
        if (acc.setA.size() <= 1) {
            // perform expensive ops and accumulate results
        }

        // Easy:
        if (enough)
            break;
    }

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}

您可以

而不是从 ids.stream() 开始
  1. 使用ids.spliterator()
  2. 将生成的拆分器包装到具有可变布尔标志的自定义拆分器中
  3. 如果标志已更改,则自定义拆分器的 tryAdvance return 为 false
  4. 使用 StreamSupport.stream(Spliterator<T>, boolean)
  5. 将您的自定义拆分器转换为流
  6. 像以前一样继续您的流管道
  7. 当您的累加器已满时,通过切换布尔值来关闭流

添加一些静态辅助方法以保持其功能。

结果API可以看看这个

Accumulator acc = terminateableStream(ids, (stream, terminator) ->
   stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing

如果条件取决于累加器状态而不是流成员,它确实有效。这基本上就是我上面概述的方法。

它可能在 JDK 提供的 takeWhile 中被禁止,但是使用拆分器的自定义实现可以自由地采用有状态的方法。

如评论所述:使用场景听起来有点可疑。一方面是因为reduce而不是collect的用法,另一方面是因为应该使用条件来停止减少也出现在累加器中。听起来简单地 将流限制 到一定数量的元素,或者基于条件,如 another question 所示,在这里可能更合适。

当然,在实际应用中,可能条件实际上与处理的元素个数无关。对于这种情况,我在这里草拟了一个解决方案,它基本上对应于 ,并且与上述问题的解决方案非常相似:它使用从 [=15= 创建的 Stream ] 除非满足停止条件,否则简单地委托给原始 Spliterator

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        ResultSupplier r = new ResultSupplier();
        System.out.println(r.get());
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class ResultSupplier implements Supplier<String>
{
    private final List<Integer> ids;
    ResultSupplier()
    {
        ids = new ArrayList<Integer>(Collections.nCopies(20, 1));
    }

    public String get()
    {
        //return getOriginal();
        return getStopping();
    }

    private String getOriginal()
    {
        Accumulator acc =
            ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private String getStopping()
    {
        Spliterator<Integer> originalSpliterator = ids.spliterator();
        Accumulator accumulator = new Accumulator();
        Spliterator<Integer> stoppingSpliterator = 
            new Spliterators.AbstractSpliterator<Integer>(
                originalSpliterator.estimateSize(), 0)
            {
                @Override
                public boolean tryAdvance(Consumer<? super Integer> action)
                {
                    return accumulator.set.size() > 10 ? false : 
                        originalSpliterator.tryAdvance(action);
                }
            };
        Stream<Integer> stream = 
            StreamSupport.stream(stoppingSpliterator, false);
        Accumulator acc =
            stream.reduce(accumulator, f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private static int counter = 0;
    private static BiFunction<Accumulator, Integer, Accumulator> f()
    {
        return (acc, element) -> {

            System.out.print("Step " + counter);
            if (acc.set.size() <= 10)
            {
                System.out.print(" expensive");
                acc.set.add(counter);
            }
            System.out.println();
            counter++;
            return acc;
        };
    }
}

Edit in response to the comments:

当然可以写成"more functional"。然而,由于问题中的模糊描述和 "sketchy" 代码示例,很难在这里找到 "THE" 最合适的解决方案。 (并且 "appropriate" 指的是实际任务的具体注意事项, 指的是 how functional it should be 的问题而不牺牲可读性)。

可能的功能化步骤可能包括创建通用 StoppingSpliterator class,它在委托 Spliterator 上运行并以 Supplier<Boolean> 作为其停止条件,并提供这与实际 Accumulator 上的 Predicate 一起,并在各处使用一些实用方法和方法引用。

但再次强调:这是否真的是一个合适的解决方案,或者是否应该使用 ...

中的简单实用的解决方案,是值得商榷的
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        List<Integer> collection = 
            new ArrayList<Integer>(Collections.nCopies(20, 1));
        System.out.println(compute(collection));
    }

    private static String compute(List<Integer> collection)
    {
        Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10;
        Accumulator result = reduceStopping(collection, 
            new Accumulator(), StopStreamReduction::accumulate, stopCondition);
        return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set);
    }

    private static int counter;
    private static Accumulator accumulate(Accumulator a, Integer element)
    {
        System.out.print("Step " + counter);
        if (a.set.size() <= 10)
        {
            System.out.print(" expensive");
            a.set.add(counter);
        }
        System.out.println();
        counter++;
        return a;
    }

    static <U, T> U reduceStopping(
        Collection<T> collection, U identity,
        BiFunction<U, ? super T, U> accumulator,
        Predicate<U> stopCondition)
    {
       // This assumes that the accumulator always returns
       // the identity instance (with the accumulated values).
       // This may not always be true!
       return StreamSupport.stream(
           new StoppingSpliterator<T>(
               collection.spliterator(), 
               () -> stopCondition.test(identity)), false).
                   reduce(identity, accumulator, (x, y) -> null);
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T>
{
    private final Spliterator<T> delegate;
    private final Supplier<Boolean> stopCondition;

    StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition)
    {
        super(delegate.estimateSize(), 0);
        this.delegate = delegate;
        this.stopCondition = stopCondition;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (stopCondition.get())
        {
            return false;
        }
        return delegate.tryAdvance(action);
    }
}

我认为,可以从您的自定义收集器(或减少操作)中抛出一个特殊类型的 RuntimeException,它将结果合并到异常对象中,并在 collect 操作解包之外捕获它结果。我知道对非异常控制流使用异常不是惯用的,但即使对于并行流,它也应该适用于您的情况。

实际上,在很多情况下减少短路可能会有用。例如收集枚举值到EnumSet(发现所有可能的枚举值都收集完了就可以停止了)。或与 Stream<Set> 的所有元素相交(如果结果集在某个步骤后变为空,则可以停止:继续减少是无用的)。在内部有一个 SHORT_CIRCUIT 标志用于像 findFirst 这样的流操作,但它没有暴露给 public API.

没有真正的 FP 解决方案,只是因为您的整个累加器不是 FP。在这方面我们无法帮助您,因为我们不知道它实际上在做什么。我们所看到的是它依赖于两个可变集合,因此不能成为纯 FP 解决方案的一部分。

如果您接受这些限制并且没有 干净 使用 Stream API 的方法,您可能会争取 简单 方式。简单的方法包含一个有状态的 Predicate,这不是最好的,但有时是不可避免的:

public Result get() {
    int limit = 1;
    Set<A> setA=new HashSet<>();
    Set<B> setB=new HashSet<>();
    return ids.stream().anyMatch(i -> {
        // perform expensive ops and accumulate results
        return setA.size() > limit;
    })? Result.invalid(): Result.valid(setB);
}

但我想指出,鉴于您的特定逻辑,即当集合变得太大时,您的结果被视为 无效,您尝试处理不太多的元素是一个错误案例的优化。你不应该浪费精力优化它。如果有效结果是处理所有元素的结果,则处理所有元素…

我同意之前的所有回答。你通过强制减少可变累加器来做错了。此外,您所描述的过程不能表示为转换和缩减的管道。

如果你真的,真的需要用 FP 风格,我会按照@the8472 指出的那样去做。

无论如何,我给你一个新的更紧凑的替代方案,类似于@lukas-eder 的解决方案,使用迭代器:

Function<Integer, Integer> costlyComputation = Function.identity();

Accumulator acc = new Accumulator();

Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator();

while (!acc.hasEnough() && ids.hasNext())
  costlyComputation.andThen(acc::add).apply(ids.next());

您在这里对 FP 有两种不同的担​​忧:

如何停止迭代

由于您依赖于可变状态,FP 只会让您的生活更加艰难。您可以在外部迭代集合或按照我的建议使用 Iterator。

然后,使用 if() 停止迭代。

您可以想出不同的策略,但归根结底,这就是您正在使用的。

我更喜欢迭代器,因为它更加地道(在这种情况下能更好地表达您的意图)。

如何设计累加器和代价高昂的操作

这对我来说是最有趣的。

纯函数不能有状态,必须接收某些东西并且必须 return 某些东西,并且对于相同的输入总是相同的东西(如数学函数)。你能这样表达你的昂贵操作吗?

是否需要与累加器共享一些状态?也许那个共享不属于他们两个。

您会转换您的输入然后将其追加到累加器中,还是由累加器负责?将函数注入累加器是否有意义?