首次发射后,不要终止 "firstWithSignal" 内剩余的 Mono

Do not terminate the remaining Mono's inside the "firstWithSignal" after first emit

我有一个 API return 错误响应非常快,当它 - 例如 - 找不到该项目时,但成功响应大约需要 5 秒。 我想 return 用户回复我们收到的错误代码,但我也不能有 5 秒的等待时间。 所以我打算解决这个问题的方式是这样的:

 Mono.firstWithSignal(
    Mono.delay(Duration.ofSeconds(1)).thenReturn(HttpStatus.ACCEPTED),
    sendRequest()
 );

我想要实现的是无论花费多少时间都执行请求,但如果花费的时间比一秒长 - 用 HttpStatus.ACCEPTED.

响应客户端

一旦第一个 Mono 发出信号,Reactor 就会终止任何剩余的 Mono 的问题。

我不太喜欢这个解决方案,感觉有点老套,但它确实有效。 如果缓存 Mono 的结果 - 它不能再被取消。您也可以使用 share 运算符,但我认为 cache 更适合这里。

 @Test
public void test() {
    var result = Mono.firstWithSignal(
            Mono.delay(Duration.ofSeconds(3))
                    .doOnNext((val) -> System.out.println("HEEEREREEEE"))
                    .thenReturn(3)
                    .cache(),
            Mono.delay(Duration.ofSeconds(1)).thenReturn(1)
    ).block();
    System.out.println(result);
    Mono.delay(Duration.ofSeconds(4)).block();
    System.out.println("DONE");
}

您可以同时订阅 Mono 并尝试将值发送到 Sinks.One


Sinks.One<HttpStatus> sink = Sinks.one();

// subscribe to the request
request().subscribe(sink::tryEmitValue, sink::tryEmitError);

// subscribe to the timeout
Mono.just(HttpStatus.ACCEPTED).delayElement(Duration.ofSeconds(1))
                              .subscribe(sink::tryEmitValue);

// subscribe to the sink
sink.asMono()
    .subscribe(status -> {
      System.out.println("Success with status : " + status);
    }, error -> {
      System.out.println("Error : " + error.getMessage());
    });