Project Reactor:用 Mono 的结果丰富 Flux 的结果
Project Reactor: Enriching the result of a Flux with the result of a Mono
我正在努力将响应式编程引入我的公司。我正在构建 activity 推荐系统的简单演示以证明其性能优势。
我遇到的挑战之一是用另一个流的结果丰富一个流中的结果。我有一个工作示例,如下所示,但我不确定这种方法是否存在任何问题。有人可以看看并提供任何可能的改进吗。
public Flux<Integer> getRecommendedActivities(Long userId, String location, Integer limit) {
Flux<ActivityData> activities = activityDatabaseService.getByLocation(location);
Mono<Map<String,BigInteger>> userCategoryScores = userScoresDatabaseService.get(userId);
return activities
.zipWith(userCategoryScores.cache().repeat(), this::scoreActivitiesBasedOnUserCategoryScores)
.sort(compareActivityScoreStrength)
.map(ScoredActivityData::getActivityId)
.take(limit);
}
private ScoredActivityData scoreActivitiesBasedOnUserCategoryScores(ActivityData deal,Map<String, BigInteger> categoryScores){
//This method combines the deal score and user category scores to come up with a final score
}
谢谢,
卡尔
你那里的代码没有本质上的错误。一些可能有用也可能没用的文体要点:
- 响应式编程的 "norm" 是使用流畅的风格并在整个过程中内联所有内容,而不是在方法顶部声明单独的局部变量并在响应链中使用它们。
x.zipWith(y.cache().repeat())
模式可以很好地工作,但如果可以避免的话,我觉得它有点难看(zipWith()
意味着两个 real Flux
的数据在我的脑海中,而不是 Mono
被任意缓存和重复的数据 - 因此行为不一定像 "stand out" 那样明显。)相反,我更喜欢 y.flatMapMany(x)
- 它更清楚地表明您正在获取一个值并通过 real Flux
将转换应用于多个值。所以在你的情况下,这可能看起来像:
userScoresDatabaseService.get(userId)
.flatMapMany(c -> activityDatabaseService.getByLocation(location)
.map(a -> scoreActivitiesBasedOnUserCategoryScores(a, c))
)
.sort() //etc.
Flux.sort()
实际上应该是一个 "last resort" 操作,特别是因为您将性能优势作为探索响应式的原因。 (从数据库服务中读取整个数据,然后对其进行排序,然后只取第一个 n
值会导致效率低下——你最好在数据层中对值进行排序和限制。)记住 Flux.sort()
将 有 等待整个源 Flux
在排序和返回值之前完成,将每个值存储在内存中,这样做会丢失很多首先是 Flux
的好处。它还使您的反应链更短更简单,因为它不需要担心排序和限制。
- 尼特:你的一些方法名和变量名看起来很长。如果可能的话,我会缩短它们 - 我发现这使得在反应链的上下文中更容易阅读。
有了以上几点,你的整个 getRecommendedActivities()
可以读成这样:
scoresDb.get(userId)
.flatMapMany(c -> activityDb.getByLocation(location, limit, comparator)
.map(a -> score(a, c).getActivityId())
)
...至少对我来说,读起来更短更简单。
我正在努力将响应式编程引入我的公司。我正在构建 activity 推荐系统的简单演示以证明其性能优势。
我遇到的挑战之一是用另一个流的结果丰富一个流中的结果。我有一个工作示例,如下所示,但我不确定这种方法是否存在任何问题。有人可以看看并提供任何可能的改进吗。
public Flux<Integer> getRecommendedActivities(Long userId, String location, Integer limit) {
Flux<ActivityData> activities = activityDatabaseService.getByLocation(location);
Mono<Map<String,BigInteger>> userCategoryScores = userScoresDatabaseService.get(userId);
return activities
.zipWith(userCategoryScores.cache().repeat(), this::scoreActivitiesBasedOnUserCategoryScores)
.sort(compareActivityScoreStrength)
.map(ScoredActivityData::getActivityId)
.take(limit);
}
private ScoredActivityData scoreActivitiesBasedOnUserCategoryScores(ActivityData deal,Map<String, BigInteger> categoryScores){
//This method combines the deal score and user category scores to come up with a final score
}
谢谢, 卡尔
你那里的代码没有本质上的错误。一些可能有用也可能没用的文体要点:
- 响应式编程的 "norm" 是使用流畅的风格并在整个过程中内联所有内容,而不是在方法顶部声明单独的局部变量并在响应链中使用它们。
x.zipWith(y.cache().repeat())
模式可以很好地工作,但如果可以避免的话,我觉得它有点难看(zipWith()
意味着两个 realFlux
的数据在我的脑海中,而不是Mono
被任意缓存和重复的数据 - 因此行为不一定像 "stand out" 那样明显。)相反,我更喜欢y.flatMapMany(x)
- 它更清楚地表明您正在获取一个值并通过 realFlux
将转换应用于多个值。所以在你的情况下,这可能看起来像:userScoresDatabaseService.get(userId) .flatMapMany(c -> activityDatabaseService.getByLocation(location) .map(a -> scoreActivitiesBasedOnUserCategoryScores(a, c)) ) .sort() //etc.
Flux.sort()
实际上应该是一个 "last resort" 操作,特别是因为您将性能优势作为探索响应式的原因。 (从数据库服务中读取整个数据,然后对其进行排序,然后只取第一个n
值会导致效率低下——你最好在数据层中对值进行排序和限制。)记住Flux.sort()
将 有 等待整个源Flux
在排序和返回值之前完成,将每个值存储在内存中,这样做会丢失很多首先是Flux
的好处。它还使您的反应链更短更简单,因为它不需要担心排序和限制。- 尼特:你的一些方法名和变量名看起来很长。如果可能的话,我会缩短它们 - 我发现这使得在反应链的上下文中更容易阅读。
有了以上几点,你的整个 getRecommendedActivities()
可以读成这样:
scoresDb.get(userId)
.flatMapMany(c -> activityDb.getByLocation(location, limit, comparator)
.map(a -> score(a, c).getActivityId())
)
...至少对我来说,读起来更短更简单。