将 Flux 的结果合并为 Mono 的结果
Combining result from Flux to result from Mono
我开始使用 Project reactor,而我遇到的一个小问题是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
现在我要实现的是:
我如何结合 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。
有人可以帮助了解如何合并来自 userFlux 和 groupMono 的结果。我想我使用类似 Zip 的东西,但它会从源进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但我需要将多个用户添加到该组。 return Mono<List<Users>
然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
?
我认为 Flux.combineLatest
静态方法可以帮助你:因为你的 Mono
只发出 1 个元素,该元素将始终是与来自 [=13= 的每个传入值相结合的元素].
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
为其他人添加答案,我用过,Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)
。我使用了这种方法而不是@Simon 建议的方法,因为持有用户的底层组是 HashSet
并且以反应方式添加用户将不是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon 的建议。
这个 1 到 N 的映射听起来类似于我在这里回答的问题:
Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?
以防 link 出现故障,这里再次给出答案。我不认为这种方法会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 绕过一个缓慢的操作,那么拥有一些缓存层可能会很好。
假设您有这样的助焊剂和单声道:
// a flux that contains 6 elements.
final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
// a mono of 1 element.
final Mono<String> groupLabel = Mono.just("someGroupLabel");
首先,我将向您展示尝试压缩 2 的错误方法,我想其他人也会尝试:
// wrong way - this will only emit 1 event
final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
.zipWith(groupLabel);
// you'll see that onNext() is only called once,
// emitting 1 item from the mono and first item from the flux.
wrongWayOfZippingFluxToMono
.log()
.subscribe();
// this is how to zip up the flux and mono how you'd want,
// such that every time the flux emits, the mono emits.
final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
.flatMap(userId -> Mono.just(userId)
.zipWith(groupLabel));
// you'll see that onNext() is called 6 times here, as desired.
correctWayOfZippingFluxToMono
.log()
.subscribe();
Flux.fromIterable(List.of(1,2,3,4,5,6))
.zipWith(Mono.just("groupLabel").cache().repeat())
会将您的标签压缩到 flux
发出的每个值
我开始使用 Project reactor,而我遇到的一个小问题是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
现在我要实现的是:
我如何结合 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。
有人可以帮助了解如何合并来自 userFlux 和 groupMono 的结果。我想我使用类似 Zip 的东西,但它会从源进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但我需要将多个用户添加到该组。 return Mono<List<Users>
然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
?
我认为 Flux.combineLatest
静态方法可以帮助你:因为你的 Mono
只发出 1 个元素,该元素将始终是与来自 [=13= 的每个传入值相结合的元素].
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
为其他人添加答案,我用过,Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)
。我使用了这种方法而不是@Simon 建议的方法,因为持有用户的底层组是 HashSet
并且以反应方式添加用户将不是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon 的建议。
这个 1 到 N 的映射听起来类似于我在这里回答的问题:
Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?
以防 link 出现故障,这里再次给出答案。我不认为这种方法会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 绕过一个缓慢的操作,那么拥有一些缓存层可能会很好。
假设您有这样的助焊剂和单声道:
// a flux that contains 6 elements.
final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
// a mono of 1 element.
final Mono<String> groupLabel = Mono.just("someGroupLabel");
首先,我将向您展示尝试压缩 2 的错误方法,我想其他人也会尝试:
// wrong way - this will only emit 1 event
final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
.zipWith(groupLabel);
// you'll see that onNext() is only called once,
// emitting 1 item from the mono and first item from the flux.
wrongWayOfZippingFluxToMono
.log()
.subscribe();
// this is how to zip up the flux and mono how you'd want,
// such that every time the flux emits, the mono emits.
final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
.flatMap(userId -> Mono.just(userId)
.zipWith(groupLabel));
// you'll see that onNext() is called 6 times here, as desired.
correctWayOfZippingFluxToMono
.log()
.subscribe();
Flux.fromIterable(List.of(1,2,3,4,5,6))
.zipWith(Mono.just("groupLabel").cache().repeat())
会将您的标签压缩到 flux
发出的每个值