在 Project Reactor 中对 doOnNext 执行并忘记操作

Fire and forget Action on doOnNext in Project Reactor

我有一个 Flux 流。对于处理的每个元素,我都希望触发一个 asynchronous/non-blocking 动作。例如,从数据库更新返回 Mono 的方法。 我希望此操作在 doOnNext 块上完成。 我不想影响 Flux、在那里实施的处理和背压。

假设要调用的Mono方法是

Mono<Integer> dbUpdate();

我的Flux应该是这样的吗?

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data));
}

或者应该如堆栈溢出所述

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data).subscribe());
}

以上不会导致 doOnNext 内部出现阻塞问题吗?

还有哪个调度程序最适合用于此类操作?

dbUpdate()不订阅将被忽略。以下代码段不会打印任何内容,因为 Mono.just("db update") 未被订阅。

Mono<String> dbUpdate() {
    return Mono.just("db update")
        .doOnNext(System.out::println);
}

public Flux<String> processData() {
    return Flux.just("item 1", "item 2")
        .doOnNext(data -> dbUpdate());
}

请注意 .subscribe() 不会阻塞您的线程,它会立即启动工作并 returns。