Spring Reactor:Mono.zip 在空单声道上失败

Spring Reactor: Mono.zip fails on empty Mono

我正在使用 Spring Reactor 3.1.0.M3 并且有一个用例需要合并来自多个源的 Mono。我发现如果其中一个 Monos 是空的 Mono,则 zip 会失败而不会出现错误。

示例:

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        sb.append((String) string);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());

添加 m3 时,在响应为 null 时跳过组合器。当我删除 m3 时,一切都按预期工作,并返回 "AB"。 有没有办法通过检测空的 Mono 来处理这个问题? 另外,有没有办法让组合器方法知道对象的类型而不是强制转换?

zip 运算符不是这样的。这实际上是违反直觉的:您的代码需要一个包含 3 个元素的元组,而您只得到两个?!?

在这种情况下,您处于控制之中,只有您可以决定什么是好的默认值(如果提供了 none(请记住,null 值是反应流规范所禁止的)。

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty().defaultIfEmpty("");

Mono<String> combined = Mono.when(m1, m2, m3).map(t -> {
    StringBuffer sb = new StringBuffer();
    sb.append(t.getT1());
    sb.append(t.getT2());
    sb.append(t.getT3());
    return sb.toString();
});

编辑

您似乎对 Publisher 类型的性质感到困惑,请参阅:

if one of the Monos is an empty Mono, zip fails without an error

So if I was to try and zip Mono's and for some reason one is empty, the zip would fail and I cannot seem to put in any code to safeguard against that

Mono 不是失败案例:它只是没有发出任何值并且成功完成。您可以通过更改代码示例来验证:

    combined.subscribe(
            s -> System.out.println("element: " + s), // doesn't execute
            s -> System.out.println("error: " + s), // doesn't execute
            () -> { System.out.println("complete!"); // prints
    });

因此,根据您的要求,您可以:

  • 在这 3 个 Mono 实例上应用 defaultIfEmpty 运算符,如果有方便的默认值,您可以依赖
  • 在组合的 Mono 上应用 defaultIfEmpty 运算符,使用默认值,甚至将其转换为带有 combined.switchIfEmpty(Mono.error(...))
  • 的错误消息

Mono.zip

任何源的错误或空完成将分别导致其他源被取消,并且生成的 Mono 立即出错或完成。

当一个 Mono 源完成后没有任何价值,如果您还想继续执行其他源,那么 Mono.zipDelayError 是您的最佳选择。

String 的情况下,很容易为空的情况定义默认值,这很好地解决了问题,如 Brian 的回答中所述。但是,对于其他自定义类型,由于某种原因可能难以创建空对象。这些情况的替代方法是使用 Optional。不过,此解决方案有一些繁重的样板文件。

Mono<Optional<String>> m1 = Mono.just("A").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m2 = Mono.just("B").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m3 = Mono.<String>empty().map(Optional::of).defaultIfEmpty(Optional.empty());

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        ((Optional<String>) string).ifPresent(sb::append);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());

应该是Mono.zip:

Mono<String> myStrings = Mono.zip(monoA, monoB)
  .map(tuple -> {
    return new StringBuffer()
      .append(t.getT1());
      .append(t.getT2());
});

也可以通过 Mono.just(Optional.empty()).

而不是禁止 nullMono.empty() 通过 zip()

完全公开,我的示例基本上是 Brian Clozel 的示例,但进行了少量编辑以突出显示其他数据类型。

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.fromSupplier(() -> new String());

Mono<String> combined = Mono.zip(m1, m2, m3).map(t -> {
    StringBuffer sb = new StringBuffer();
    sb.append(t.getT1());
    sb.append(t.getT2());
    sb.append(t.getT3());
    return sb.toString();
});

也就是说,zip 将忽略为空的条目,即 Mono.empty。所以要完成它,你需要有一个非空值,在本例中是一个空字符串,但是一个新的数据类型实例供你稍后在代码中检查也是好的