如何一次 reduce/iterate 多个 java 相同大小的流?

How to reduce/iterate multiple java streams of same size at once?

我有多个Java个相同长度的流,我想对每个流的对应元素进行一些操作。例如,添加所有流的第一个元素、所有流的第二个元素和所有流的第三个元素等等。

我们如何在不先减少每个流的情况下做到这一点?

对于最小可重现示例,我有以下测试片段

@Test
void aggregateMultipleStreams() {
    Stream<Integer> s1 = Stream.of(1, 2);
    Stream<Integer> s2 = Stream.of(4, 5);
    Stream<Integer> s3 = Stream.of(7, 8);
    assertEquals(List.of(1 + 4 + 7, 2 + 5 + 8), aggregate(s1, s2, s3, 2));
}

我可以按如下方式编写 aggregate 方法,首先减少所有流。

private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
    final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
            .map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
            .map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}

但这可能是一个存储麻烦,如果每个流包含很多记录,对于N条记录,我们这里需要3N存储。

是否可以在不先归约的情况下实现不同流中对应元素的相加?我们可以在 Java 中一次减少多个流吗?

执行下面@jb_dk的解决方案后,解决方案代码片段变为:

    private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {

    final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
            .map(Stream::iterator).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
            .map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}

形成一个输入流对象数组,而不是 3 个命名变量,然后创建一个输出列表或流,并在流长度上使用外循环,并在输入流数组上迭代并读取一个内循环每个元素,添加到输出数组元素。

类似(代码未测试,可能存在语法错误)

    ...
    {
        List<Integer> results; // NOT final, can be a stream builder instead
        final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
        Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
        // Get the iterator (not rewindable) for each stream
        int i = 0;
        for (Stream<Integer> instrm : instrms) {
            initers[i++] = ((Iterator<Integer>)instrm::iterator);
        }
        // Actually loop over the stream elements, outputting one
        //    sum element for each element.  Assumes all input streams
        //    are same length as the first one.
        while(! initers[0].hasNext()) {
            Integer    res1 = 0;
            for (Iterator<Integer> initer : initers) {
                res1 += initer.next();
            }
            results.Add(res1);
        }
        return results;  // results.build() if a stream builder
    }

这基本上是一个 zip 操作,其中位置 i 中每个流的元素与另一个流中相应的 i-th 元素相加。

您可以通过从每个流返回一个 Iterator 并构建一个结果 Spliterator 来实现您的需要,它压缩流的迭代器返回的每个 i-th 元素。

基本上,生成的 Spliterator 可以建立在自定义 Iterator 的基础上,它简单地迭代和总结流 Iterators.[=19= 返回的每个 next() 元素]

这是一个实现:

public static <T> Stream<T> zipStreams(BiFunction<? super T, ? super T, ? extends T> funZip, Stream<? extends T>... vetStreams) {
    if (vetStreams == null || vetStreams.length == 0) {
        return null;
    }

    //Creating a List of Spliterator for each given stream
    List<Spliterator<? extends T>> listSliterators = new ArrayList();
    for (Stream<? extends T> s : vetStreams) {
        listSliterators.add(s.spliterator());
    }

    // Creating a final Spliterator built on the Spliterators of every stream.
    // The final Spliterator will be implemented with a custom Iterator which basically iterates every stream's iterator and "sum" their elements.
    // The sum is actually performed via the given function funZip to keep the data type generic.

    //Retrieving the common characteristics from the streams' spliterators
    int commonCharacteristics = listSliterators.get(0).characteristics();
    for (Spliterator spliterator : listSliterators) {
        commonCharacteristics = commonCharacteristics & spliterator.characteristics();
    }
    //zipping two streams also loses the distinct and sorted properties
    commonCharacteristics = commonCharacteristics & ~(Spliterator.DISTINCT | Spliterator.SORTED);

    //Retrieving the common minimum size in case streams of different lengths have been passed.
    //This parameter is necessary to instantiate the final Spliterator and create the resulting stream.
    long commonSize = -1;
    if ((commonCharacteristics & Spliterator.SIZED) != 0) {
        commonSize = listSliterators.stream().map(s -> s.getExactSizeIfKnown()).min(Comparator.naturalOrder()).orElse(-1L);
    }

    //Creating a list of iterators from the Spliterators created from the given streams
    List<Iterator<? extends T>> listIterators = new ArrayList<>();
    for (Spliterator spliterator : listSliterators) {
        listIterators.add(Spliterators.iterator(spliterator));
    }

    //Creating a result iterator built on the streams' iterators
    Iterator<? extends T> resIterator = new Iterator<>() {
        @Override
        public boolean hasNext() {
            //If any iterator has not a hasNext() then false is returned; otherwise true
            return listIterators.stream().anyMatch(i -> !i.hasNext()) ? false : true;
        }

        @Override
        public T next() {
            //Summing every Iterator's next() element
            T n = listIterators.get(0).next();
            for (int i = 1; i < listIterators.size(); i++) {
                n = funZip.apply(n, listIterators.get(i).next());
            }
            return n;
        }
    };

    //The Spliterator is created as parallel only if every given stream is parallel
    boolean isAnyStreamParallel = Arrays.stream(vetStreams).anyMatch(s -> !s.isParallel()) ? false : true;

    //Returning a stream built from a spliterator which is in turn built on the resulting iterator zipping every given streams' element
    return StreamSupport.stream(Spliterators.spliterator(resIterator, commonSize, commonCharacteristics), isAnyStreamParallel);
}

输出

12
15

这里还有一个link用来测试不同数据类型的代码:

https://www.jdoodle.com/iembed/v0/rQr