从 spring webflux 反应管道中提取变量

Extract variable from spring webflux reactive pipeline

我正在使用 Spring webflux 处理反应流。我想从反应管道的中间提取一个变量(name)并在不同的地方使用它,如下所示。

public class Example {

    public Mono<String> test() {
        String name;

        return Mono.just("some random string")
                    .map(s -> {
                        name = s.toUpperCase(); 
                        return name;
                    }).map(...)
                      .map(...)
                      .flatMap(...)
                      .map(...)
                      .map(result -> result+name)
                      .doOnSuccess(res -> asyncPublish(name));
     
     public void asyncPublish(String name) {
        // logic to write to a Messaging queue asynchronously
     }
    }
}

以上代码无效。这是一个人为的例子,但展示了我想要实现的目标。

注意: 我不想使用多个 zip 运算符只是为了将 name 一直带到我想要的最后一张地图使用它。有没有一种方法可以将它存储在一个变量中,如上所示,然后在我需要的任何地方使用它。

例如,您可以使用 Tuple2name 的值与修改后的数据一起通过链传递。

return Mono.just("some random string")
            .map(s -> s.toUpperCase())
            .map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
            .map(...) // keeping Tuple with the value of `name` in the first slot...
            .flatMap(...) // keeping Tuple with the value of `name` in the first slot...
            .map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2() + resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
            .doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
            .map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...

Tuples 来自包 reactor.util.function 和 reactor-core 的一部分。

另一种方法(不使用元组通过链传递值)可能是使用 AtomicReference(但我仍然认为元组方式更简洁)。 AtomicReference 方式可能如下所示:

public Mono<String> test() {
    final AtomicReference<String> nameRef = new AtomicReference<>();

    return Mono.just("some random string")
                .map(s -> {
                    final String name = s.toUpperCase(); 
                    nameRef.set(name);
                    return name;
                }).map(...)
                  .map(...)
                  .flatMap(...)
                  .map(...)
                  .map(result -> result+nameRef.get())
                  .doOnSuccess(res -> asyncPublish(nameRef.get()));
 
 public void asyncPublish(String name) {
    // logic to write to a Messaging queue asynchronously
 }
}