如果 Reactor Flux 未在设定时间内完成,则抛出异常

Throw an exception if a Reactor Flux doesn't complete in a set time

我有一个可能很长的 运行 Flux,我想在一段时间后停止。我已经找到了几种执行此操作的方法,但是我正在努力解决的是如何判断 Flux 超时而不是自然完成。

示例(非常简单)代码:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .collectList()
        .block();

我想要的是这样的:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .collectList()
        .block();

然而 take 似乎没有错误通知;我所看到的只是一个 terminatecomplete 信号,如果我不在 Flux 中包含 take 并让它自然完成,我就会得到这个信号。

我已经简要地查看了 timeout 运算符,它确实会抛出异常,但是 timeout 看起来它只会在 Flux 不发出单个元素时抛出异常某个时间,而不是如果整个 Flux 在某个时间没有完成。

有没有人有任何提示或示例来说明他们是如何解决这个问题的?

提前致谢!

除了使用 take(),您还可以使用 .mergeWith(Flux.never().timeout(Duration.ofMillis(500))) 将您的 flux 与另一个在特定时间段后始终抛出超时异常的 flux 合并。

举个例子,你会做这样的事情:

Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(10))
        .mergeWith(Flux.<Integer>never().timeout(Duration.ofMillis(500)))
        .doOnNext(System.out::println)
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .collectList()
        .block();

...这会给你这样的东西:

(...snip)
24
25
26
27
28
29
30
31
I timed out

已更新

您可以使用 takeUntilOther 运算符在给定时间后发出错误信号:

Duration timeout = Duration.ofMillis(500);
Flux.range(0, 10000)
    .delayElements(Duration.ofMillis(10))
    .doOnNext(System.out::println)
    .takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
    // .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
    .doOnError(t -> {
        if (t instanceof TimeoutException) {
            System.out.println("I timed out");
        }
    })
    .blockLast();

如果使用 .collectList().then() 运算符将 Flux 转换为 Mono,那么您可以在之后简单地应用 .timeout(...) 运算符,然后它会按照您的要求运行:

Flux.range(0, 10000)
    .delayElements(Duration.ofMillis(10))
    .doOnNext(System.out::println)
    .collectList()
    .timeout(Duration.ofMillis(500))
    .doOnError(t -> {
        if (t instanceof TimeoutException) {
            System.out.println("I timed out");
        }
    })
    .block();