如何一次 reduce/iterate 多个 java 相同大小的流?
How to reduce/iterate multiple java streams of same size at once?
我有多个Java个相同长度的流,我想对每个流的对应元素进行一些操作。例如,添加所有流的第一个元素、所有流的第二个元素和所有流的第三个元素等等。
我们如何在不先减少每个流的情况下做到这一点?
对于最小可重现示例,我有以下测试片段
@Test
void aggregateMultipleStreams() {
Stream<Integer> s1 = Stream.of(1, 2);
Stream<Integer> s2 = Stream.of(4, 5);
Stream<Integer> s3 = Stream.of(7, 8);
assertEquals(List.of(1 + 4 + 7, 2 + 5 + 8), aggregate(s1, s2, s3, 2));
}
我可以按如下方式编写 aggregate
方法,首先减少所有流。
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
.map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
.map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}
但这可能是一个存储麻烦,如果每个流包含很多记录,对于N条记录,我们这里需要3N存储。
是否可以在不先归约的情况下实现不同流中对应元素的相加?我们可以在 Java 中一次减少多个流吗?
执行下面@jb_dk的解决方案后,解决方案代码片段变为:
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
.map(Stream::iterator).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
.map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}
形成一个输入流对象数组,而不是 3 个命名变量,然后创建一个输出列表或流,并在流长度上使用外循环,并在输入流数组上迭代并读取一个内循环每个元素,添加到输出数组元素。
类似(代码未测试,可能存在语法错误)
...
{
List<Integer> results; // NOT final, can be a stream builder instead
final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
// Get the iterator (not rewindable) for each stream
int i = 0;
for (Stream<Integer> instrm : instrms) {
initers[i++] = ((Iterator<Integer>)instrm::iterator);
}
// Actually loop over the stream elements, outputting one
// sum element for each element. Assumes all input streams
// are same length as the first one.
while(! initers[0].hasNext()) {
Integer res1 = 0;
for (Iterator<Integer> initer : initers) {
res1 += initer.next();
}
results.Add(res1);
}
return results; // results.build() if a stream builder
}
这基本上是一个 zip 操作,其中位置 i
中每个流的元素与另一个流中相应的 i-th 元素相加。
您可以通过从每个流返回一个 Iterator
并构建一个结果 Spliterator
来实现您的需要,它压缩流的迭代器返回的每个 i-th 元素。
基本上,生成的 Spliterator
可以建立在自定义 Iterator
的基础上,它简单地迭代和总结流 Iterators
.[=19= 返回的每个 next() 元素]
这是一个实现:
public static <T> Stream<T> zipStreams(BiFunction<? super T, ? super T, ? extends T> funZip, Stream<? extends T>... vetStreams) {
if (vetStreams == null || vetStreams.length == 0) {
return null;
}
//Creating a List of Spliterator for each given stream
List<Spliterator<? extends T>> listSliterators = new ArrayList();
for (Stream<? extends T> s : vetStreams) {
listSliterators.add(s.spliterator());
}
// Creating a final Spliterator built on the Spliterators of every stream.
// The final Spliterator will be implemented with a custom Iterator which basically iterates every stream's iterator and "sum" their elements.
// The sum is actually performed via the given function funZip to keep the data type generic.
//Retrieving the common characteristics from the streams' spliterators
int commonCharacteristics = listSliterators.get(0).characteristics();
for (Spliterator spliterator : listSliterators) {
commonCharacteristics = commonCharacteristics & spliterator.characteristics();
}
//zipping two streams also loses the distinct and sorted properties
commonCharacteristics = commonCharacteristics & ~(Spliterator.DISTINCT | Spliterator.SORTED);
//Retrieving the common minimum size in case streams of different lengths have been passed.
//This parameter is necessary to instantiate the final Spliterator and create the resulting stream.
long commonSize = -1;
if ((commonCharacteristics & Spliterator.SIZED) != 0) {
commonSize = listSliterators.stream().map(s -> s.getExactSizeIfKnown()).min(Comparator.naturalOrder()).orElse(-1L);
}
//Creating a list of iterators from the Spliterators created from the given streams
List<Iterator<? extends T>> listIterators = new ArrayList<>();
for (Spliterator spliterator : listSliterators) {
listIterators.add(Spliterators.iterator(spliterator));
}
//Creating a result iterator built on the streams' iterators
Iterator<? extends T> resIterator = new Iterator<>() {
@Override
public boolean hasNext() {
//If any iterator has not a hasNext() then false is returned; otherwise true
return listIterators.stream().anyMatch(i -> !i.hasNext()) ? false : true;
}
@Override
public T next() {
//Summing every Iterator's next() element
T n = listIterators.get(0).next();
for (int i = 1; i < listIterators.size(); i++) {
n = funZip.apply(n, listIterators.get(i).next());
}
return n;
}
};
//The Spliterator is created as parallel only if every given stream is parallel
boolean isAnyStreamParallel = Arrays.stream(vetStreams).anyMatch(s -> !s.isParallel()) ? false : true;
//Returning a stream built from a spliterator which is in turn built on the resulting iterator zipping every given streams' element
return StreamSupport.stream(Spliterators.spliterator(resIterator, commonSize, commonCharacteristics), isAnyStreamParallel);
}
输出
12
15
这里还有一个link用来测试不同数据类型的代码:
我有多个Java个相同长度的流,我想对每个流的对应元素进行一些操作。例如,添加所有流的第一个元素、所有流的第二个元素和所有流的第三个元素等等。
我们如何在不先减少每个流的情况下做到这一点?
对于最小可重现示例,我有以下测试片段
@Test
void aggregateMultipleStreams() {
Stream<Integer> s1 = Stream.of(1, 2);
Stream<Integer> s2 = Stream.of(4, 5);
Stream<Integer> s3 = Stream.of(7, 8);
assertEquals(List.of(1 + 4 + 7, 2 + 5 + 8), aggregate(s1, s2, s3, 2));
}
我可以按如下方式编写 aggregate
方法,首先减少所有流。
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
.map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
.map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}
但这可能是一个存储麻烦,如果每个流包含很多记录,对于N条记录,我们这里需要3N存储。
是否可以在不先归约的情况下实现不同流中对应元素的相加?我们可以在 Java 中一次减少多个流吗?
执行下面@jb_dk的解决方案后,解决方案代码片段变为:
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
.map(Stream::iterator).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
.map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}
形成一个输入流对象数组,而不是 3 个命名变量,然后创建一个输出列表或流,并在流长度上使用外循环,并在输入流数组上迭代并读取一个内循环每个元素,添加到输出数组元素。
类似(代码未测试,可能存在语法错误)
...
{
List<Integer> results; // NOT final, can be a stream builder instead
final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
// Get the iterator (not rewindable) for each stream
int i = 0;
for (Stream<Integer> instrm : instrms) {
initers[i++] = ((Iterator<Integer>)instrm::iterator);
}
// Actually loop over the stream elements, outputting one
// sum element for each element. Assumes all input streams
// are same length as the first one.
while(! initers[0].hasNext()) {
Integer res1 = 0;
for (Iterator<Integer> initer : initers) {
res1 += initer.next();
}
results.Add(res1);
}
return results; // results.build() if a stream builder
}
这基本上是一个 zip 操作,其中位置 i
中每个流的元素与另一个流中相应的 i-th 元素相加。
您可以通过从每个流返回一个 Iterator
并构建一个结果 Spliterator
来实现您的需要,它压缩流的迭代器返回的每个 i-th 元素。
基本上,生成的 Spliterator
可以建立在自定义 Iterator
的基础上,它简单地迭代和总结流 Iterators
.[=19= 返回的每个 next() 元素]
这是一个实现:
public static <T> Stream<T> zipStreams(BiFunction<? super T, ? super T, ? extends T> funZip, Stream<? extends T>... vetStreams) {
if (vetStreams == null || vetStreams.length == 0) {
return null;
}
//Creating a List of Spliterator for each given stream
List<Spliterator<? extends T>> listSliterators = new ArrayList();
for (Stream<? extends T> s : vetStreams) {
listSliterators.add(s.spliterator());
}
// Creating a final Spliterator built on the Spliterators of every stream.
// The final Spliterator will be implemented with a custom Iterator which basically iterates every stream's iterator and "sum" their elements.
// The sum is actually performed via the given function funZip to keep the data type generic.
//Retrieving the common characteristics from the streams' spliterators
int commonCharacteristics = listSliterators.get(0).characteristics();
for (Spliterator spliterator : listSliterators) {
commonCharacteristics = commonCharacteristics & spliterator.characteristics();
}
//zipping two streams also loses the distinct and sorted properties
commonCharacteristics = commonCharacteristics & ~(Spliterator.DISTINCT | Spliterator.SORTED);
//Retrieving the common minimum size in case streams of different lengths have been passed.
//This parameter is necessary to instantiate the final Spliterator and create the resulting stream.
long commonSize = -1;
if ((commonCharacteristics & Spliterator.SIZED) != 0) {
commonSize = listSliterators.stream().map(s -> s.getExactSizeIfKnown()).min(Comparator.naturalOrder()).orElse(-1L);
}
//Creating a list of iterators from the Spliterators created from the given streams
List<Iterator<? extends T>> listIterators = new ArrayList<>();
for (Spliterator spliterator : listSliterators) {
listIterators.add(Spliterators.iterator(spliterator));
}
//Creating a result iterator built on the streams' iterators
Iterator<? extends T> resIterator = new Iterator<>() {
@Override
public boolean hasNext() {
//If any iterator has not a hasNext() then false is returned; otherwise true
return listIterators.stream().anyMatch(i -> !i.hasNext()) ? false : true;
}
@Override
public T next() {
//Summing every Iterator's next() element
T n = listIterators.get(0).next();
for (int i = 1; i < listIterators.size(); i++) {
n = funZip.apply(n, listIterators.get(i).next());
}
return n;
}
};
//The Spliterator is created as parallel only if every given stream is parallel
boolean isAnyStreamParallel = Arrays.stream(vetStreams).anyMatch(s -> !s.isParallel()) ? false : true;
//Returning a stream built from a spliterator which is in turn built on the resulting iterator zipping every given streams' element
return StreamSupport.stream(Spliterators.spliterator(resIterator, commonSize, commonCharacteristics), isAnyStreamParallel);
}
输出
12
15
这里还有一个link用来测试不同数据类型的代码: