从 spring webflux 反应管道中提取变量
Extract variable from spring webflux reactive pipeline
我正在使用 Spring webflux 处理反应流。我想从反应管道的中间提取一个变量(name
)并在不同的地方使用它,如下所示。
public class Example {
public Mono<String> test() {
String name;
return Mono.just("some random string")
.map(s -> {
name = s.toUpperCase();
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result+name)
.doOnSuccess(res -> asyncPublish(name));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}
}
以上代码无效。这是一个人为的例子,但展示了我想要实现的目标。
注意: 我不想使用多个 zip
运算符只是为了将 name
一直带到我想要的最后一张地图使用它。有没有一种方法可以将它存储在一个变量中,如上所示,然后在我需要的任何地方使用它。
例如,您可以使用 Tuple2 将 name
的值与修改后的数据一起通过链传递。
return Mono.just("some random string")
.map(s -> s.toUpperCase())
.map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
.map(...) // keeping Tuple with the value of `name` in the first slot...
.flatMap(...) // keeping Tuple with the value of `name` in the first slot...
.map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2() + resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
.doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
.map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...
Tuples
来自包 reactor.util.function
和 reactor-core 的一部分。
另一种方法(不使用元组通过链传递值)可能是使用 AtomicReference
(但我仍然认为元组方式更简洁)。 AtomicReference 方式可能如下所示:
public Mono<String> test() {
final AtomicReference<String> nameRef = new AtomicReference<>();
return Mono.just("some random string")
.map(s -> {
final String name = s.toUpperCase();
nameRef.set(name);
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result+nameRef.get())
.doOnSuccess(res -> asyncPublish(nameRef.get()));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}
我正在使用 Spring webflux 处理反应流。我想从反应管道的中间提取一个变量(name
)并在不同的地方使用它,如下所示。
public class Example {
public Mono<String> test() {
String name;
return Mono.just("some random string")
.map(s -> {
name = s.toUpperCase();
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result+name)
.doOnSuccess(res -> asyncPublish(name));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}
}
以上代码无效。这是一个人为的例子,但展示了我想要实现的目标。
注意: 我不想使用多个 zip
运算符只是为了将 name
一直带到我想要的最后一张地图使用它。有没有一种方法可以将它存储在一个变量中,如上所示,然后在我需要的任何地方使用它。
例如,您可以使用 Tuple2 将 name
的值与修改后的数据一起通过链传递。
return Mono.just("some random string")
.map(s -> s.toUpperCase())
.map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
.map(...) // keeping Tuple with the value of `name` in the first slot...
.flatMap(...) // keeping Tuple with the value of `name` in the first slot...
.map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2() + resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
.doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
.map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...
Tuples
来自包 reactor.util.function
和 reactor-core 的一部分。
另一种方法(不使用元组通过链传递值)可能是使用 AtomicReference
(但我仍然认为元组方式更简洁)。 AtomicReference 方式可能如下所示:
public Mono<String> test() {
final AtomicReference<String> nameRef = new AtomicReference<>();
return Mono.just("some random string")
.map(s -> {
final String name = s.toUpperCase();
nameRef.set(name);
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result+nameRef.get())
.doOnSuccess(res -> asyncPublish(nameRef.get()));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}