在 Flux 上使用 map() 的 Reactor 不会推送元素,但 .flapMap() 会

Reactor using map() on a Flux does not push elements but .flapMap() does

我想即使在多次阅读 javadocs 之后我也没有得到 map 和 flatMap 除了同步和异步之外的区别 transforms.In 下面的代码我没有得到任何事件(它表现为如果 subscribe() 不存在。

final Flux<GroupedFlux<String, TData>> groupedFlux =
            flux.groupBy(Event::getPartitionKey);
    groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
            .map(this::doWork)
            .doOnError(throwable -> log.error("error: ", throwable))
            .onErrorResume(e -> Mono.empty())
            .subscribe());

但是 flapMap() 有效。这很好用-

final Flux<GroupedFlux<String, TData>> groupedFlux =
            flux.groupBy(Event::getPartitionKey);
    groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
            .flatMap(this::doWork)
            .doOnError(throwable -> log.error("error: ", throwable))
            .onErrorResume(e -> Mono.empty())
            .subscribe());

这是为什么?

编辑: 按照评论中的建议添加了 doWork 方法的示例代码。

private Mono<Object> doWork(Object event) {
    // do some work and possibly return Mono<Object> or
    return Mono.empty();
}

我认为这是因为您的 doWork 方法 return 是一个 Mono。 map 操作隐式地将你的 returned 对象包装在一个 Mono 中,所以你得到一个 Mono。由于您的原始流程订阅了包装器 Mono,但其中的那个没有订阅它,因此它永远不会产生任何东西。相比之下,flatMap 需要明确的包装。

尝试将您的 doWork 方法修改为 return 而不是 Mono 并在 flatMap 操作中执行显式 Mono.just。