为什么 Mono/Flux 的超时(Duration timeout)都是根据累计时间触发的
Why are the timeout(Duration timeout) of both Mono/Flux fired based on accumulated time period
我的案例如下:
我有两个名为request1 和request2 的网络请求,request2 的输入来自request1 的输出。现在我想为这两个请求设置超时。理想情况下request1的时间成本是2s,request2的时间成本是3s。所以我想设置request1的超时时间为3s,request2的超时时间为4s。正如 Mono#timeout 的文档所说,我认为这是可能的。但不幸的是第二次超时是通过累加计算的。所以我对 this mono.
的含义感到困惑
Mono 的文档#timeout(Duration timeout)(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#timeout-java.time.Duration-)
public final Mono<T> timeout(Duration timeout)
Propagate a TimeoutException in case no item arrives within the given Duration.
Parameters:
timeout - the timeout before the onNext signal from this Mono
Returns: a Mono that can time out
我的案例示例代码:
Mono<String> startMono = Mono.just("start");
String result = startMono
.map(x -> {
log.info("received message: {}", x);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "#1 enriched: " + x;
})
.timeout(Duration.ofSeconds(3))
.onErrorResume(throwable -> {
log.warn("Caught exception, apply fallback behavior #1", throwable);
return Mono.just("item from backup #1");
})
.map(y -> {
log.info("received message: {}", y);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "#2 enriched: " + y;
})
.timeout(Duration.ofSeconds(4))
// there is no timeoutException thrown if I set the second timeout to 6s (6s > 2s + 3s)
// .timeout(Duration.ofSeconds(6))
.onErrorResume(throwable -> {
log.warn("Caught exception, apply fallback behavior #2", throwable);
return Mono.just("item from backup #2");
})
.block();
log.info("result: {}", result);
上述代码抛出的异常:
16:46:51.080 [main] INFO MonoDemo - received message: start
16:46:53.095 [elastic-2] INFO MonoDemo - received message: #1 enriched: start
16:46:55.079 [parallel-1] WARN MonoDemo - Caught exception, apply fallback behavior #2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 4000ms in 'flatMap' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:395) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
16:46:55.095 [main] INFO MonoDemo - result: item from backup #2
timeout
运算符测量从订阅到第一个信号到达的时间。有关此内容的更多信息 。
如果您只想在第二个操作上应用超时,则需要将 timeout
运算符放在只有第二个请求在范围内的地方。请参阅以下内容:
public void execute() {
firstRequest()
.onErrorResume(throwable -> secondRequest())
.onErrorReturn("some static fallback value if second failed as well")
.block();
}
private Mono<String> firstRequest() {
return Mono.delay(Duration.ofSeconds(2))
.thenReturn("first")
.timeout(Duration.ofSeconds(3));
// additional mapping can be done here related to first request
}
private Mono<String> secondRequest() {
return Mono.delay(Duration.ofSeconds(3))
.thenReturn("second")
.timeout(Duration.ofSeconds(4));
// additional mapping can be done here related to second request
}
通过在私有方法中移动 timeout
运算符,我们确保只测量那些特定 Mono
的持续时间,而不是整个链。
我的案例如下:
我有两个名为request1 和request2 的网络请求,request2 的输入来自request1 的输出。现在我想为这两个请求设置超时。理想情况下request1的时间成本是2s,request2的时间成本是3s。所以我想设置request1的超时时间为3s,request2的超时时间为4s。正如 Mono#timeout 的文档所说,我认为这是可能的。但不幸的是第二次超时是通过累加计算的。所以我对 this mono.
的含义感到困惑Mono 的文档#timeout(Duration timeout)(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#timeout-java.time.Duration-)
public final Mono<T> timeout(Duration timeout)
Propagate a TimeoutException in case no item arrives within the given Duration.
Parameters:
timeout - the timeout before the onNext signal from this Mono
Returns: a Mono that can time out
我的案例示例代码:
Mono<String> startMono = Mono.just("start");
String result = startMono
.map(x -> {
log.info("received message: {}", x);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "#1 enriched: " + x;
})
.timeout(Duration.ofSeconds(3))
.onErrorResume(throwable -> {
log.warn("Caught exception, apply fallback behavior #1", throwable);
return Mono.just("item from backup #1");
})
.map(y -> {
log.info("received message: {}", y);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "#2 enriched: " + y;
})
.timeout(Duration.ofSeconds(4))
// there is no timeoutException thrown if I set the second timeout to 6s (6s > 2s + 3s)
// .timeout(Duration.ofSeconds(6))
.onErrorResume(throwable -> {
log.warn("Caught exception, apply fallback behavior #2", throwable);
return Mono.just("item from backup #2");
})
.block();
log.info("result: {}", result);
上述代码抛出的异常:
16:46:51.080 [main] INFO MonoDemo - received message: start
16:46:53.095 [elastic-2] INFO MonoDemo - received message: #1 enriched: start
16:46:55.079 [parallel-1] WARN MonoDemo - Caught exception, apply fallback behavior #2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 4000ms in 'flatMap' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:395) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
16:46:55.095 [main] INFO MonoDemo - result: item from backup #2
timeout
运算符测量从订阅到第一个信号到达的时间。有关此内容的更多信息
如果您只想在第二个操作上应用超时,则需要将 timeout
运算符放在只有第二个请求在范围内的地方。请参阅以下内容:
public void execute() {
firstRequest()
.onErrorResume(throwable -> secondRequest())
.onErrorReturn("some static fallback value if second failed as well")
.block();
}
private Mono<String> firstRequest() {
return Mono.delay(Duration.ofSeconds(2))
.thenReturn("first")
.timeout(Duration.ofSeconds(3));
// additional mapping can be done here related to first request
}
private Mono<String> secondRequest() {
return Mono.delay(Duration.ofSeconds(3))
.thenReturn("second")
.timeout(Duration.ofSeconds(4));
// additional mapping can be done here related to second request
}
通过在私有方法中移动 timeout
运算符,我们确保只测量那些特定 Mono
的持续时间,而不是整个链。