如何将组合函数转换为反应式管道?

How to convert composed Functions into a reactive pipeline?

我目前有一个定义为 java.util.function.Function 组合的处理管道,我想在反应上下文中使用它,例如用 Flux#transform 调用它。我应该如何修改 pipeline() 的签名,使其适合作为 Flux#transform 的参数?

Function<A, C> pipeline(UnaryOperator<A> f1,
    Function<A, B> f2_io,
    UnaryOperator<B> f3,
    UnaryOperator<B> f4_io,
    UnaryOperator<B> f5,
    Function<B, C> f6_io) {
  return f1.andThen(f2_io).andThen(f3).andThen(f4_io).andThen(f5).andThen(f6_io);
}

第一次转换尝试可能只是将 returned 类型从 Function<A, C> 更改为 Function<Flux<A>, Flux<C>>:

Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
    Function<A, B> f2_io,
    UnaryOperator<B> f3,
    UnaryOperator<B> f4_io,
    UnaryOperator<B> f5,
    Function<B, C> f6_io) {
  return f -> f.map(f1).map(f2_io).map(f3).map(f4_io).map(f5).map(f6_io);
}

这里最大的优点是 f1..f6 的签名不需要改变。如果这些函数中的 none 执行 I/O 操作就好了,但是,正如它们的后缀提示,f2_iof4_iof6_io 可能会阻塞I/O操作,所以我觉得应该returnMonos.

如果我们将 Function<A, B> f2_io 更改为 Function<A, Mono<B>> f2_io,我们将在第二个 map() 之后得到一个 Flux<Mono<B>>。下面函数的returned类型应该怎么改?我可以避免将 UnaryOperator<B> f3 更改为 UnaryOperator<Mono<B>> f3 吗?

map 以同步方式转换元素,换句话说,在每个操作上使用映射块,直到评估完成。所以 map 不是阻塞 io 的合适运算符。异步 flatMap 运算符系列将是这里的正确选择。

如果一个函数本身不返回反应类型,你总是可以将函数包装在Mono中以在不改变函数的情况下实现异步执行:

Function<Flux<A>, Flux<C>> pipeline(UnaryOperator<A> f1,
                                            Function<A, B> f2_io,
                                            UnaryOperator<B> f3,
                                            UnaryOperator<B> f4_io,
                                            UnaryOperator<B> f5,
                                            Function<B, C> f6_io) {
    return f -> f.map(f1)
            .flatMap(v -> Mono.fromCallable(() -> f2_io.apply(v)))
            .map(f3)
            .flatMap(v -> Mono.fromCallable(() -> f4_io.apply(v)))
            .map(f5)
            .flatMap(v -> Mono.fromCallable(() -> f6_io.apply(v)));
}

顺便说一句,有一个很棒的工具 BlockHound 可以检测反应性管道中非阻塞调度程序的阻塞调用。