WebFlux 的深度 "flatReduce"?
Deep "flatReduce" for WebFlux?
我已经定义了一个运算符 flatReduce()
,它对于 reduce()
就像 flatMap()
对于 map()
:
public class FlatReduce {
public static <V, W> Mono<V> flatReduce(Mono<V> initial, Iterable<W> items, BiFunction<V, W, Mono<V>> accumulator) {
for (W item : items) {
initial = initial.flatMap(v -> accumulator.apply(v, item));
}
return initial;
}
public static void main(String[] args) {
flatReduce(Mono.just(1), IntStream.range(0, 4000).mapToObj(it -> it).collect(toList()), (a, b) -> Mono.just(a + b)).block();
}
}
这会产生深度嵌套 flatMaps
并且我观察到堆栈溢出。有什么办法可以解决这个问题(也许可以把它变成延续风格)?
谢谢。
通过将 Mono
转换为 CompletableFuture
找到了解决方法。
@Test
public void runTest() {
System.out.println(flatReduce(Mono.just(1), IntStream.range(1, 500000).mapToObj(it -> it).collect(toList()), (a, b) -> Mono.just(a + 1)).block());
}
private static <V, W> Mono<V> flatReduce(Mono<V> initial, List<W> items, BiFunction<V, W, Mono<V>> accumulator) {
return Mono.fromCompletionStage(flatReduceWithFuture(initial.toFuture(), items, (v, w) -> accumulator.apply(v, w).toFuture()));
}
private static <V, W> CompletableFuture<V> flatReduceWithFuture(CompletableFuture<V> initial, List<W> items, BiFunction<V, W, CompletableFuture<V>> accumulator) {
for (W item : items) {
initial = initial.thenCompose(x -> accumulator.apply(x, item));
}
return initial;
}
我已经定义了一个运算符 flatReduce()
,它对于 reduce()
就像 flatMap()
对于 map()
:
public class FlatReduce {
public static <V, W> Mono<V> flatReduce(Mono<V> initial, Iterable<W> items, BiFunction<V, W, Mono<V>> accumulator) {
for (W item : items) {
initial = initial.flatMap(v -> accumulator.apply(v, item));
}
return initial;
}
public static void main(String[] args) {
flatReduce(Mono.just(1), IntStream.range(0, 4000).mapToObj(it -> it).collect(toList()), (a, b) -> Mono.just(a + b)).block();
}
}
这会产生深度嵌套 flatMaps
并且我观察到堆栈溢出。有什么办法可以解决这个问题(也许可以把它变成延续风格)?
谢谢。
通过将 Mono
转换为 CompletableFuture
找到了解决方法。
@Test
public void runTest() {
System.out.println(flatReduce(Mono.just(1), IntStream.range(1, 500000).mapToObj(it -> it).collect(toList()), (a, b) -> Mono.just(a + 1)).block());
}
private static <V, W> Mono<V> flatReduce(Mono<V> initial, List<W> items, BiFunction<V, W, Mono<V>> accumulator) {
return Mono.fromCompletionStage(flatReduceWithFuture(initial.toFuture(), items, (v, w) -> accumulator.apply(v, w).toFuture()));
}
private static <V, W> CompletableFuture<V> flatReduceWithFuture(CompletableFuture<V> initial, List<W> items, BiFunction<V, W, CompletableFuture<V>> accumulator) {
for (W item : items) {
initial = initial.thenCompose(x -> accumulator.apply(x, item));
}
return initial;
}