流状态计算:累计和
Stream stateful computation: cumulative sums
假设我有一个 Java IntStream,是否可以将它转换为具有累加和的 IntStream?例如,以 [4, 2, 6, ...] 开头的流应转换为 [4, 6, 12, ...]。
更一般地说,应该如何实现有状态流操作?感觉应该可以这样:
myIntStream.map(new Function<Integer, Integer> {
int sum = 0;
Integer apply(Integer value){
return sum += value;
}
);
明显的限制是这仅适用于顺序流。但是,Stream.map 明确要求无状态映射函数。我错过了 Stream.statefulMap 或 Stream.cumulative 操作是对的还是错过了 Java 流的要点?
例如与 Haskell 比较,其中 scanl1 函数正好解决了这个例子:
scanl1 (+) [1 2 3 4] = [1 3 6 10]
可以使用收集器创建新流:
class Accumulator {
public static void accept(List<Integer> list, Integer value) {
list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
}
public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
int total = list1.get(list1.size() - 1);
list2.stream().map(n -> n + total).forEach(list1::add);
return list1;
}
}
这用作:
myIntStream.parallel()
.collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
.stream();
希望您能看到这个收集器的重要属性是,即使流是并行的,因为 Accumulator
实例组合在一起,它也会调整总数。
这显然不如映射操作高效,因为它收集整个流,然后生成一个新流。但这不仅仅是一个实现细节:它是流旨在潜在并发处理这一事实的必要功能。
我已经用 IntStream.range(0, 10000).parallel()
测试过,它运行正常。
你可以用原子序数来做到这一点。例如:
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
public class Accumulator {
public static LongStream toCumulativeSumStream(IntStream ints){
AtomicLong sum = new AtomicLong(0);
return ints.sequential().mapToLong(sum::addAndGet);
}
public static void main(String[] args){
LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
sums.forEachOrdered(System.out::println);
}
}
这输出:
1
3
6
10
我使用 Long 来存储总和,因为两个 int 加起来完全有可能超过 Integer.MAX_VALUE
,而 long 溢出的可能性较小。
假设我有一个 Java IntStream,是否可以将它转换为具有累加和的 IntStream?例如,以 [4, 2, 6, ...] 开头的流应转换为 [4, 6, 12, ...]。
更一般地说,应该如何实现有状态流操作?感觉应该可以这样:
myIntStream.map(new Function<Integer, Integer> {
int sum = 0;
Integer apply(Integer value){
return sum += value;
}
);
明显的限制是这仅适用于顺序流。但是,Stream.map 明确要求无状态映射函数。我错过了 Stream.statefulMap 或 Stream.cumulative 操作是对的还是错过了 Java 流的要点?
例如与 Haskell 比较,其中 scanl1 函数正好解决了这个例子:
scanl1 (+) [1 2 3 4] = [1 3 6 10]
可以使用收集器创建新流:
class Accumulator {
public static void accept(List<Integer> list, Integer value) {
list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
}
public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
int total = list1.get(list1.size() - 1);
list2.stream().map(n -> n + total).forEach(list1::add);
return list1;
}
}
这用作:
myIntStream.parallel()
.collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
.stream();
希望您能看到这个收集器的重要属性是,即使流是并行的,因为 Accumulator
实例组合在一起,它也会调整总数。
这显然不如映射操作高效,因为它收集整个流,然后生成一个新流。但这不仅仅是一个实现细节:它是流旨在潜在并发处理这一事实的必要功能。
我已经用 IntStream.range(0, 10000).parallel()
测试过,它运行正常。
你可以用原子序数来做到这一点。例如:
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
public class Accumulator {
public static LongStream toCumulativeSumStream(IntStream ints){
AtomicLong sum = new AtomicLong(0);
return ints.sequential().mapToLong(sum::addAndGet);
}
public static void main(String[] args){
LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
sums.forEachOrdered(System.out::println);
}
}
这输出:
1
3
6
10
我使用 Long 来存储总和,因为两个 int 加起来完全有可能超过 Integer.MAX_VALUE
,而 long 溢出的可能性较小。