Webflux 链接多个 Mono

Webflux Chaining multiple Mono

我是反应式编程 (Spring webflux) 的新手,想知道如何最好地处理这个用例。 我有一个响应式服务调用 (getAccount) returns Mono 我想将它与另一个服务调用 getBooks returns Mono 和最后一个同步调用 transform 执行某种转换和 returns 类似 Mono 我将如何以 Mono 的形式链接和 return 转换后的数据getAccountgetBooks 为空时记录警告?这是我尝试做的示例的简化版本。

此处提供了一些虚假服务

 public static Mono<String> getAccount(String name){
        return Mono.just(name);
    }
    public static Mono<Set<Book> getBooks(String title){
        return Mono.just(Sets.newHashSet(new Book(title + "One", "Author One"),
            new Book(title +"Two", "Author Two"),
            new Book(title + "Three", "Author Three")));

    }
    public static LibraryBook transform (Book a){
        return new LibraryBook(a.getTitle(), a.getAuthorName(), "someUniqueId");
    }

我想获取给定用户的帐户并找到 he/she 借出的所有书籍并转换这些书籍和 return 值作为 Mono 同时在适当的地方记录警告

这是我的开始

public Mono<Set<LibraryBook>> getBorrowedBooks(String userId) {
     return getAccount(userId)
            .flatMap(account ->  getBooks(account))
            .log()
            .map(books -> books.stream().map(book -> transform(book)).collect(Collectors.toSet()))
          
}

但是,我不确定混合反应式和流是否是一件坏事,而且看起来不对。

更新:

由于您无法修改 getBooks 方法,您可以通过以下方式构造您的 getBorrowedBooks 方法以避免处理流。

注意 - 记录器和异常仅作为示例。您也可以用不同的方式处理空白场景。

public class MyApp {

  private static final Logger LOGGER = LoggerFactory.getLogger(MyApp.class);

  public static void main(String[] args) {

    List<LibraryBook> libraryBooks = getBorrowedBooks("Abhi").collectList().block();
    libraryBooks.forEach(System.out::println);
  }

  public static Mono<String> getAccount(String name) {
    return Mono.just(name);
  }

  public static Mono<Set<Book>> getBooks(String title) {
    return Mono.just(Sets.newHashSet(new Book(title + "One", "Author One"),
        new Book(title + "Two", "Author Two"),
        new Book(title + "Three", "Author Three")));

  }

  public static LibraryBook transform(Book a) {
    return new LibraryBook(a.getTitle(), a.getAuthorName(), "someUniqueId");
  }

  public static Flux<LibraryBook> getBorrowedBooks(String userId) {
    return getAccount(userId)
        .switchIfEmpty(Mono.defer(() -> {
          LOGGER.error("No account found");
          return Mono.error(new NoAccountFoundException());
        }))
        .flatMap(account -> getBooks(account))
        .flatMapMany(Flux::fromIterable)
        .switchIfEmpty(Mono.defer(() -> {
          LOGGER.error("No books found for account");
          return Mono.error(new NoBooksFoundForGivenAccountException());
        }))
        .map(MyApp::transform);
  }

明智地编译这是正确的。但是从逻辑上讲,我认为这是不正确的,因为您没有考虑单声道和通量的定义。

Mono 是 0..1 个元素的流。 Flux 是一个可以发出 0..N 个元素的流。

方法 getBooks(顾名思义)应该为给定的标题发出超过 1 个元素(这里是 Book)。所以它的 return 类型应该是 flux 而不是 collection.

的 Mono

甚至您也可以从 Spring 的响应式存储库方法中获取示例:

现在,在反应世界中,删除重复项并将 collection 存储在哈希集中的想法等同于对元素的 Flux 调用 distinct()

所以您的 getBooks 方法应该如下所示:

  public static Flux<Book> getBooks(String title){

    return Flux.just(new Book(title + "One", "Author One"),
        new Book(title +"Two", "Author Two"),
        new Book(title + "Three", "Author Three"))
        .distinct();

  }

而您的 getBorrowedBooks 方法应该如下所示:

  public Flux<LibraryBook> getBorrowedBooks(String userId) {
    return getAccount(userId)
        .flatMapMany(account -> getBooks(account))
        .log()
        .map(book -> transform(book));
  }