如何使用组合器函数压缩 8 个以上的单声道

How to zip more than 8 mono using a combinator function

我正在通过为作为先前 Web 服务调用结果返回的每个项目调用多个 Web 服务来丰富数据(即 - 散开)。

我正在使用 Kotlin、Spring Boot 2 和新的响应式 WebClient。

在这个片段中,我只展示了散布到 web 服务的代码,但实际上这是在一个更大的管道的末端。

以前我一直在使用 Mono.zip(t1, t2, t3) 接口,它支持最多传递 8 个值和 returns 具有正确类型的元组。

定义:

    public static <T1, T2, T3, T4, T5, T6, T7, T8> Mono<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>> zip(Mono<? extends T1> p1,
            Mono<? extends T2> p2,
            Mono<? extends T3> p3,
            Mono<? extends T4> p4,
            Mono<? extends T5> p5,
            Mono<? extends T6> p6,
            Mono<? extends T7> p7,
            Mono<? extends T8> p8) {
        return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4, p5, p6, p7, p8));
    }

我现在有超过 8 个服务要调用,所以我正在查看 Mono.zip(combinatorFn, ... monos) 组合调用结果的方法。

定义:

    public static <R> Mono<R> zip(Function<? super Object[], ? extends R> combinator, Mono<?>... monos) {
        if (monos.length == 0) {
            return empty();
        }
        if (monos.length == 1) {
            return monos[0].map(d -> combinator.apply(new Object[]{d}));
        }
        return onAssembly(new MonoZip<>(false, combinator, monos));
    }

我使用 KOTLIN 的问题 我在调用站点定义组合器函数时遇到困难。这是我目前所拥有的:

val tst: (Array<Any>) -> Mono<AggregateReport> = { it -> Mono.just(AggregateReport("Test")) }


val res = Mono.zip(
        tst,
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono()
)

显然,我需要增强组合器才能真正处理输入数组并生成聚合。但是代码目前无法编译。

编译错误:

Error:(339, 41) Kotlin: Type mismatch: inferred type is (Array<Any>) -> Mono<ConsoleApplication.AggregateReport> but Function<in Array<(out) Any!>!, out (???..???)> was expected

有什么建议吗?

谢谢!

在纯 kotlin 中你不能这样做。

您在问题描述中提出的方法 zip 的签名需要 Function 类型,即 java 类型。

这个(kotlin 之一):

    val kotlin: (List<String>) -> String = { array: List<String> -> "kotlin" }

不等同于 (java):

    Function<List<String>, String> java = array -> "java"

您可以做的是:

import reactor.core.publisher.Mono
import java.util.function.Function

...

fun main() {
    val tst = Function<Array<Any>, Mono<AggregateReport>> { Mono.just(AggregateReport("Test")) }

    Mono.zip(
        tst,
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono(),
        Client.callThatReturnsAMono()
    )
}

您必须实际使用 java.util.function.Function 类型,但 Kotlin 有助于使外观更漂亮。

经过一些额外的研究,我发现有 Kotlin Extensions for Reactor 提供了一个 zip 函数和一个更好的方法签名来组合 N monos。

IE - fun <R> zip(vararg monos: Mono<*>, combinator: (Array<*>) -> R): Mono<R>

通过这种方式,我能够提供一个需要数组并返回单声道的组合器。我实现了一个通用组合器,它可以检查传入响应的类型并将断言键入聚合的各种属性的正确子类。

https://github.com/reactor/reactor-kotlin-extensions