在 Flux 上使用 map() 的 Reactor 不会推送元素,但 .flapMap() 会
Reactor using map() on a Flux does not push elements but .flapMap() does
我想即使在多次阅读 javadocs 之后我也没有得到 map 和 flatMap 除了同步和异步之外的区别 transforms.In 下面的代码我没有得到任何事件(它表现为如果 subscribe() 不存在。
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.map(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
但是 flapMap() 有效。这很好用-
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.flatMap(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
这是为什么?
编辑:
按照评论中的建议添加了 doWork 方法的示例代码。
private Mono<Object> doWork(Object event) {
// do some work and possibly return Mono<Object> or
return Mono.empty();
}
我认为这是因为您的 doWork 方法 return 是一个 Mono。 map 操作隐式地将你的 returned 对象包装在一个 Mono 中,所以你得到一个 Mono。由于您的原始流程订阅了包装器 Mono,但其中的那个没有订阅它,因此它永远不会产生任何东西。相比之下,flatMap 需要明确的包装。
尝试将您的 doWork 方法修改为 return 而不是 Mono 并在 flatMap 操作中执行显式 Mono.just。
我想即使在多次阅读 javadocs 之后我也没有得到 map 和 flatMap 除了同步和异步之外的区别 transforms.In 下面的代码我没有得到任何事件(它表现为如果 subscribe() 不存在。
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.map(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
但是 flapMap() 有效。这很好用-
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.flatMap(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
这是为什么?
编辑: 按照评论中的建议添加了 doWork 方法的示例代码。
private Mono<Object> doWork(Object event) {
// do some work and possibly return Mono<Object> or
return Mono.empty();
}
我认为这是因为您的 doWork 方法 return 是一个 Mono。 map 操作隐式地将你的 returned 对象包装在一个 Mono 中,所以你得到一个 Mono
尝试将您的 doWork 方法修改为 return 而不是 Mono 并在 flatMap 操作中执行显式 Mono.just。