如何在不丢失数据的情况下与不同的发布者建立反应性管道?

How to build a reactive pipeline with different Publishers without losing data?

假设您有这样的代码:

public List<Group> addUserToGroups(String username, String label) {
  Mono<User> userMono = webClient.getUser(username);
  User user = userMono.block();

  Flux<Group> groupsFlux = webClient.getGroups(label);
  List<Group> groups = groupsFlux.collectList().block();

  groups.forEach(group -> 
      webClient.addUserToGroup(user.getId(), group.getId()).block()
  );

  return groups;
}

但现在您想将此代码重构为非阻塞反应式管道,并将主要方法return改成Flux<Group>

所以也许你会开始做这样的事情:

public Flux<Group> addUserToGroups(String username, String label) {
  return webClient.getUser(username)
      .flatMapMany(user -> webClient.getGroups(label))
      ...
}

但现在我们遇到了一个问题,结果 Flux 中的值为 Group 并且我们需要在下一步中丢失的 User 信息。

因此,希望的管道数据流可以这样表示:

start
|
U
| /
|/
G1,U
| \
|  UG1----|
|         | 
G2,U      G1
| \       |
|  UG2----|
|         |
          G2
          |
        result: G1, G2

UGn is the result of calling webClient.addUserToGroup

实现这个的正确方法是什么?

一般来说,当您觉得需要映射到另一个值,但又要保留已有的值时,您有以下三种基本选择:

  • 使用嵌套 flatMap() 调用;
  • 创建一个新对象来保存这两种类型并使用组合(或使用 Tuple);
  • 更改您的对象结构或基础服务,使 "mapped" 对象包含您需要的所有字段。

嵌套 flatMap() 调用的优点是快速简便,但缺点是嵌套级别过多时,代码可能变得几乎不可读。组合解决了这个问题,但是你显然需要创建新类型(或使用 Tuple 类型,这显然描述性较差。)

更改对象结构是两全其美如果这样做有意义,那么值得考虑 - 但大多数时候,根据我的经验,它不是可行或明智。

恕我直言,在这里使用嵌套调用会很好,因为它只是一个嵌套级别 - 类似于:

return webClient.getUser(username)
        .flatMapMany(
                user -> webClient.getGroups(label)
                        .flatMap(group -> webClient.addUserToGroup(user.getId(), group.getId()).thenReturn(group))
        )
        .collectList();

不一定与问题相关,但是我不太喜欢这种方法的第二个方面 - 它实际上做了两件事 - 它正在为用户检索组, 然后触发第二次调用以将用户添加到组中(功能似乎有点奇怪,但我不知道用例。)我建议将这两个功能分成两个单独的方法,如果你可以。