Project Reactor:用 Mono 的结果丰富 Flux 的结果

Project Reactor: Enriching the result of a Flux with the result of a Mono

我正在努力将响应式编程引入我的公司。我正在构建 activity 推荐系统的简单演示以证明其性能优势。

我遇到的挑战之一是用另一个流的结果丰富一个流中的结果。我有一个工作示例,如下所示,但我不确定这种方法是否存在任何问题。有人可以看看并提供任何可能的改进吗。

    public Flux<Integer> getRecommendedActivities(Long userId, String location, Integer limit) {
        Flux<ActivityData> activities = activityDatabaseService.getByLocation(location);
        Mono<Map<String,BigInteger>> userCategoryScores = userScoresDatabaseService.get(userId);

        return activities
            .zipWith(userCategoryScores.cache().repeat(), this::scoreActivitiesBasedOnUserCategoryScores)
            .sort(compareActivityScoreStrength)
            .map(ScoredActivityData::getActivityId)
            .take(limit);
    }

    private ScoredActivityData scoreActivitiesBasedOnUserCategoryScores(ActivityData deal,Map<String, BigInteger> categoryScores){
        //This method combines the deal score and user category scores to come up with a final score
    }

谢谢, 卡尔

你那里的代码没有本质上的错误。一些可能有用也可能没用的文体要点:

  • 响应式编程的 "norm" 是使用流畅的风格并在整个过程中内联所有内容,而不是在方法顶部声明单独的局部变量并在响应链中使用它们。
  • x.zipWith(y.cache().repeat()) 模式可以很好地工作,但如果可以避免的话,我觉得它有点难看(zipWith() 意味着两个 real Flux 的数据在我的脑海中,而不是 Mono 被任意缓存和重复的数据 - 因此行为不一定像 "stand out" 那样明显。)相反,我更喜欢 y.flatMapMany(x) - 它更清楚地表明您正在获取一个值并通过 real Flux 将转换应用于多个值。所以在你的情况下,这可能看起来像:

    userScoresDatabaseService.get(userId)
            .flatMapMany(c -> activityDatabaseService.getByLocation(location)
                    .map(a -> scoreActivitiesBasedOnUserCategoryScores(a, c))
            )
            .sort() //etc.
    
  • Flux.sort() 实际上应该是一个 "last resort" 操作,特别是因为您将性能优势作为探索响应式的原因。 (从数据库服务中读取整个数据,然后对其进行排序,然后只取第一个 n 值会导致效率低下——你最好在数据层中对值进行排序和限制。)记住 Flux.sort() 等待整个源 Flux 在排序和返回值之前完成,将每个值存储在内存中,这样做会丢失很多首先是 Flux 的好处。它还使您的反应链更短更简单,因为它不需要担心排序和限制。
  • 尼特:你的一些方法名和变量名看起来很长。如果可能的话,我会缩短它们 - 我发现这使得在反应链的上下文中更容易阅读。

有了以上几点,你的整个 getRecommendedActivities() 可以读成这样:

scoresDb.get(userId)
        .flatMapMany(c -> activityDb.getByLocation(location, limit, comparator)
                .map(a -> score(a, c).getActivityId())
        )

...至少对我来说,读起来更短更简单。