将 Flux 的结果合并为 Mono 的结果

Combining result from Flux to result from Mono

我开始使用 Project reactor,而我遇到的一个小问题是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

现在我要实现的是:

我如何结合 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助了解如何合并来自 userFlux 和 groupMono 的结果。我想我使用类似 Zip 的东西,但它会从源进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但我需要将多个用户添加到该组。 return Mono<List<Users> 然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)?

我认为 Flux.combineLatest 静态方法可以帮助你:因为你的 Mono 只发出 1 个元素,该元素将始终是与来自 [=13= 的每个传入值相结合的元素].

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                   groupMono, userFlux);

为其他人添加答案,我用过,Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)。我使用了这种方法而不是@Simon 建议的方法,因为持有用户的底层组是 HashSet 并且以反应方式添加用户将不是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon 的建议。

这个 1 到 N 的映射听起来类似于我在这里回答的问题:

Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?

以防 link 出现故障,这里再次给出答案。我不认为这种方法会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 绕过一个缓慢的操作,那么拥有一些缓存层可能会很好。

假设您有这样的助焊剂和单声道:

 // a flux that contains 6 elements.
 final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));

 // a mono of 1 element.
 final Mono<String> groupLabel = Mono.just("someGroupLabel");

首先,我将向您展示尝试压缩 2 的错误方法,我想其他人也会尝试:

 // wrong way - this will only emit 1 event 
 final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
         .zipWith(groupLabel);

 // you'll see that onNext() is only called once, 
 //     emitting 1 item from the mono and first item from the flux.
 wrongWayOfZippingFluxToMono
         .log()
         .subscribe();

 // this is how to zip up the flux and mono how you'd want, 
 //     such that every time the flux emits, the mono emits. 
 final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
         .flatMap(userId -> Mono.just(userId)
                 .zipWith(groupLabel));

 // you'll see that onNext() is called 6 times here, as desired. 
 correctWayOfZippingFluxToMono
         .log()
         .subscribe();

Flux.fromIterable(List.of(1,2,3,4,5,6))
      .zipWith(Mono.just("groupLabel").cache().repeat())

会将您的标签压缩到 flux

发出的每个值