首次发射后,不要终止 "firstWithSignal" 内剩余的 Mono
Do not terminate the remaining Mono's inside the "firstWithSignal" after first emit
我有一个 API return 错误响应非常快,当它 - 例如 - 找不到该项目时,但成功响应大约需要 5 秒。
我想 return 用户回复我们收到的错误代码,但我也不能有 5 秒的等待时间。
所以我打算解决这个问题的方式是这样的:
Mono.firstWithSignal(
Mono.delay(Duration.ofSeconds(1)).thenReturn(HttpStatus.ACCEPTED),
sendRequest()
);
我想要实现的是无论花费多少时间都执行请求,但如果花费的时间比一秒长 - 用 HttpStatus.ACCEPTED.
响应客户端
一旦第一个 Mono 发出信号,Reactor 就会终止任何剩余的 Mono 的问题。
我不太喜欢这个解决方案,感觉有点老套,但它确实有效。
如果缓存 Mono 的结果 - 它不能再被取消。您也可以使用 share
运算符,但我认为 cache
更适合这里。
@Test
public void test() {
var result = Mono.firstWithSignal(
Mono.delay(Duration.ofSeconds(3))
.doOnNext((val) -> System.out.println("HEEEREREEEE"))
.thenReturn(3)
.cache(),
Mono.delay(Duration.ofSeconds(1)).thenReturn(1)
).block();
System.out.println(result);
Mono.delay(Duration.ofSeconds(4)).block();
System.out.println("DONE");
}
您可以同时订阅 Mono
并尝试将值发送到 Sinks.One
Sinks.One<HttpStatus> sink = Sinks.one();
// subscribe to the request
request().subscribe(sink::tryEmitValue, sink::tryEmitError);
// subscribe to the timeout
Mono.just(HttpStatus.ACCEPTED).delayElement(Duration.ofSeconds(1))
.subscribe(sink::tryEmitValue);
// subscribe to the sink
sink.asMono()
.subscribe(status -> {
System.out.println("Success with status : " + status);
}, error -> {
System.out.println("Error : " + error.getMessage());
});
我有一个 API return 错误响应非常快,当它 - 例如 - 找不到该项目时,但成功响应大约需要 5 秒。 我想 return 用户回复我们收到的错误代码,但我也不能有 5 秒的等待时间。 所以我打算解决这个问题的方式是这样的:
Mono.firstWithSignal(
Mono.delay(Duration.ofSeconds(1)).thenReturn(HttpStatus.ACCEPTED),
sendRequest()
);
我想要实现的是无论花费多少时间都执行请求,但如果花费的时间比一秒长 - 用 HttpStatus.ACCEPTED.
响应客户端一旦第一个 Mono 发出信号,Reactor 就会终止任何剩余的 Mono 的问题。
我不太喜欢这个解决方案,感觉有点老套,但它确实有效。
如果缓存 Mono 的结果 - 它不能再被取消。您也可以使用 share
运算符,但我认为 cache
更适合这里。
@Test
public void test() {
var result = Mono.firstWithSignal(
Mono.delay(Duration.ofSeconds(3))
.doOnNext((val) -> System.out.println("HEEEREREEEE"))
.thenReturn(3)
.cache(),
Mono.delay(Duration.ofSeconds(1)).thenReturn(1)
).block();
System.out.println(result);
Mono.delay(Duration.ofSeconds(4)).block();
System.out.println("DONE");
}
您可以同时订阅 Mono
并尝试将值发送到 Sinks.One
Sinks.One<HttpStatus> sink = Sinks.one();
// subscribe to the request
request().subscribe(sink::tryEmitValue, sink::tryEmitError);
// subscribe to the timeout
Mono.just(HttpStatus.ACCEPTED).delayElement(Duration.ofSeconds(1))
.subscribe(sink::tryEmitValue);
// subscribe to the sink
sink.asMono()
.subscribe(status -> {
System.out.println("Success with status : " + status);
}, error -> {
System.out.println("Error : " + error.getMessage());
});