在 Project Reactor 中对 doOnNext 执行并忘记操作
Fire and forget Action on doOnNext in Project Reactor
我有一个 Flux
流。对于处理的每个元素,我都希望触发一个 asynchronous/non-blocking 动作。例如,从数据库更新返回 Mono
的方法。
我希望此操作在 doOnNext
块上完成。
我不想影响 Flux
、在那里实施的处理和背压。
假设要调用的Mono
方法是
Mono<Integer> dbUpdate();
我的Flux
应该是这样的吗?
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data));
}
或者应该如堆栈溢出所述。
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data).subscribe());
}
以上不会导致 doOnNext
内部出现阻塞问题吗?
还有哪个调度程序最适合用于此类操作?
dbUpdate()
不订阅将被忽略。以下代码段不会打印任何内容,因为 Mono.just("db update")
未被订阅。
Mono<String> dbUpdate() {
return Mono.just("db update")
.doOnNext(System.out::println);
}
public Flux<String> processData() {
return Flux.just("item 1", "item 2")
.doOnNext(data -> dbUpdate());
}
请注意 .subscribe()
不会阻塞您的线程,它会立即启动工作并 returns。
我有一个 Flux
流。对于处理的每个元素,我都希望触发一个 asynchronous/non-blocking 动作。例如,从数据库更新返回 Mono
的方法。
我希望此操作在 doOnNext
块上完成。
我不想影响 Flux
、在那里实施的处理和背压。
假设要调用的Mono
方法是
Mono<Integer> dbUpdate();
我的Flux
应该是这样的吗?
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data));
}
或者应该如堆栈溢出所述
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data).subscribe());
}
以上不会导致 doOnNext
内部出现阻塞问题吗?
还有哪个调度程序最适合用于此类操作?
dbUpdate()
不订阅将被忽略。以下代码段不会打印任何内容,因为 Mono.just("db update")
未被订阅。
Mono<String> dbUpdate() {
return Mono.just("db update")
.doOnNext(System.out::println);
}
public Flux<String> processData() {
return Flux.just("item 1", "item 2")
.doOnNext(data -> dbUpdate());
}
请注意 .subscribe()
不会阻塞您的线程,它会立即启动工作并 returns。