如何将组合函数转换为反应式管道?
How to convert composed Functions into a reactive pipeline?
我目前有一个定义为 java.util.function.Function
组合的处理管道,我想在反应上下文中使用它,例如用 Flux#transform
调用它。我应该如何修改 pipeline()
的签名,使其适合作为 Flux#transform
的参数?
Function<A, C> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f1.andThen(f2_io).andThen(f3).andThen(f4_io).andThen(f5).andThen(f6_io);
}
第一次转换尝试可能只是将 returned 类型从 Function<A, C>
更改为 Function<Flux<A>, Flux<C>>
:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1).map(f2_io).map(f3).map(f4_io).map(f5).map(f6_io);
}
这里最大的优点是 f1
..f6
的签名不需要改变。如果这些函数中的 none 执行 I/O 操作就好了,但是,正如它们的后缀提示,f2_io
、f4_io
和 f6_io
可能会阻塞I/O操作,所以我觉得应该returnMono
s.
如果我们将 Function<A, B> f2_io
更改为 Function<A, Mono<B>> f2_io
,我们将在第二个 map()
之后得到一个 Flux<Mono<B>>
。下面函数的returned类型应该怎么改?我可以避免将 UnaryOperator<B> f3
更改为 UnaryOperator<Mono<B>> f3
吗?
map
以同步方式转换元素,换句话说,在每个操作上使用映射块,直到评估完成。所以 map
不是阻塞 io 的合适运算符。异步 flatMap
运算符系列将是这里的正确选择。
如果一个函数本身不返回反应类型,你总是可以将函数包装在Mono
中以在不改变函数的情况下实现异步执行:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1)
.flatMap(v -> Mono.fromCallable(() -> f2_io.apply(v)))
.map(f3)
.flatMap(v -> Mono.fromCallable(() -> f4_io.apply(v)))
.map(f5)
.flatMap(v -> Mono.fromCallable(() -> f6_io.apply(v)));
}
顺便说一句,有一个很棒的工具 BlockHound 可以检测反应性管道中非阻塞调度程序的阻塞调用。
我目前有一个定义为 java.util.function.Function
组合的处理管道,我想在反应上下文中使用它,例如用 Flux#transform
调用它。我应该如何修改 pipeline()
的签名,使其适合作为 Flux#transform
的参数?
Function<A, C> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f1.andThen(f2_io).andThen(f3).andThen(f4_io).andThen(f5).andThen(f6_io);
}
第一次转换尝试可能只是将 returned 类型从 Function<A, C>
更改为 Function<Flux<A>, Flux<C>>
:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1).map(f2_io).map(f3).map(f4_io).map(f5).map(f6_io);
}
这里最大的优点是 f1
..f6
的签名不需要改变。如果这些函数中的 none 执行 I/O 操作就好了,但是,正如它们的后缀提示,f2_io
、f4_io
和 f6_io
可能会阻塞I/O操作,所以我觉得应该returnMono
s.
如果我们将 Function<A, B> f2_io
更改为 Function<A, Mono<B>> f2_io
,我们将在第二个 map()
之后得到一个 Flux<Mono<B>>
。下面函数的returned类型应该怎么改?我可以避免将 UnaryOperator<B> f3
更改为 UnaryOperator<Mono<B>> f3
吗?
map
以同步方式转换元素,换句话说,在每个操作上使用映射块,直到评估完成。所以 map
不是阻塞 io 的合适运算符。异步 flatMap
运算符系列将是这里的正确选择。
如果一个函数本身不返回反应类型,你总是可以将函数包装在Mono
中以在不改变函数的情况下实现异步执行:
Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
Function<A, B> f2_io,
UnaryOperator<B> f3,
UnaryOperator<B> f4_io,
UnaryOperator<B> f5,
Function<B, C> f6_io) {
return f -> f.map(f1)
.flatMap(v -> Mono.fromCallable(() -> f2_io.apply(v)))
.map(f3)
.flatMap(v -> Mono.fromCallable(() -> f4_io.apply(v)))
.map(f5)
.flatMap(v -> Mono.fromCallable(() -> f6_io.apply(v)));
}
顺便说一句,有一个很棒的工具 BlockHound 可以检测反应性管道中非阻塞调度程序的阻塞调用。