如何在不为空时重复单声道

How to repeat Mono while not empty

我有一个方法returns像这样!

Mono<Integer> getNumberFromSomewhere();

我需要一直调用它,直到它没有更多的项目可以发出。那就是我需要将其设为 Flux<Integer>.

一个选项是添加 repeat。关键是 - 我想在上述方法发出第一个空信号时停止。

有什么办法吗?我正在寻找一种干净的方法。

我不知道有哪个内置运算符可以直接完成这项工作。但是,可以使用包装器 class 和运算符的组合来完成:

Flux<Integer> repeatUntilEmpty() {
    return getNumberFromSomewhere()
        .map(ResultWrapper::new)
        .defaultIfEmpty(ResultWrapper.EMPTY)
        .repeat()
        .takeWhile(ResultWrapper::isNotEmpty)
}

// helper class, not necessarily needs to be Java record
record ResultWrapper(Integer value) {
    public static final ResultWrapper EMPTY = new ResultWrapper(null);

    public boolean isNotEmpty() {
        return value != null;
    }
}

执行此操作的内置运算符(尽管它旨在用于“更深层次”的嵌套)是 expand

expand当返回的Publisher完全为空时自然停止展开。

您可以像这样将其应用于您的用例:

//this changes each time one subscribes to it
Mono<Integer> monoWithUnderlyingState;

Flux<Integer> repeated = monoWithUnderlyingState
    .expand(i -> monoWithUnderlyingState);