如果 Reactor Flux 未在设定时间内完成,则抛出异常
Throw an exception if a Reactor Flux doesn't complete in a set time
我有一个可能很长的 运行 Flux,我想在一段时间后停止。我已经找到了几种执行此操作的方法,但是我正在努力解决的是如何判断 Flux 超时而不是自然完成。
示例(非常简单)代码:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.collectList()
.block();
我想要的是这样的:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.collectList()
.block();
然而 take
似乎没有错误通知;我所看到的只是一个 terminate
和 complete
信号,如果我不在 Flux 中包含 take
并让它自然完成,我就会得到这个信号。
我已经简要地查看了 timeout
运算符,它确实会抛出异常,但是 timeout
看起来它只会在 Flux 不发出单个元素时抛出异常某个时间,而不是如果整个 Flux 在某个时间没有完成。
有没有人有任何提示或示例来说明他们是如何解决这个问题的?
提前致谢!
除了使用 take()
,您还可以使用 .mergeWith(Flux.never().timeout(Duration.ofMillis(500)))
将您的 flux 与另一个在特定时间段后始终抛出超时异常的 flux 合并。
举个例子,你会做这样的事情:
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.mergeWith(Flux.<Integer>never().timeout(Duration.ofMillis(500)))
.doOnNext(System.out::println)
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.collectList()
.block();
...这会给你这样的东西:
(...snip)
24
25
26
27
28
29
30
31
I timed out
已更新
您可以使用 takeUntilOther
运算符在给定时间后发出错误信号:
Duration timeout = Duration.ofMillis(500);
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
// .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.blockLast();
如果使用 .collectList()
或 .then()
运算符将 Flux
转换为 Mono
,那么您可以在之后简单地应用 .timeout(...)
运算符,然后它会按照您的要求运行:
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.collectList()
.timeout(Duration.ofMillis(500))
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.block();
我有一个可能很长的 运行 Flux,我想在一段时间后停止。我已经找到了几种执行此操作的方法,但是我正在努力解决的是如何判断 Flux 超时而不是自然完成。
示例(非常简单)代码:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.collectList()
.block();
我想要的是这样的:
Flux.range(0, 10000)
.take(Duration.ofMillis(1))
.doOnNext(System.out::println)
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.collectList()
.block();
然而 take
似乎没有错误通知;我所看到的只是一个 terminate
和 complete
信号,如果我不在 Flux 中包含 take
并让它自然完成,我就会得到这个信号。
我已经简要地查看了 timeout
运算符,它确实会抛出异常,但是 timeout
看起来它只会在 Flux 不发出单个元素时抛出异常某个时间,而不是如果整个 Flux 在某个时间没有完成。
有没有人有任何提示或示例来说明他们是如何解决这个问题的?
提前致谢!
除了使用 take()
,您还可以使用 .mergeWith(Flux.never().timeout(Duration.ofMillis(500)))
将您的 flux 与另一个在特定时间段后始终抛出超时异常的 flux 合并。
举个例子,你会做这样的事情:
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.mergeWith(Flux.<Integer>never().timeout(Duration.ofMillis(500)))
.doOnNext(System.out::println)
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.collectList()
.block();
...这会给你这样的东西:
(...snip)
24
25
26
27
28
29
30
31
I timed out
已更新
您可以使用 takeUntilOther
运算符在给定时间后发出错误信号:
Duration timeout = Duration.ofMillis(500);
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
// .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.blockLast();
如果使用 .collectList()
或 .then()
运算符将 Flux
转换为 Mono
,那么您可以在之后简单地应用 .timeout(...)
运算符,然后它会按照您的要求运行:
Flux.range(0, 10000)
.delayElements(Duration.ofMillis(10))
.doOnNext(System.out::println)
.collectList()
.timeout(Duration.ofMillis(500))
.doOnError(t -> {
if (t instanceof TimeoutException) {
System.out.println("I timed out");
}
})
.block();