Reactor Java Flux 与 Mono

Reactor Java Flux vs Mono

我要实施以下方案。 通过用户 Id 获取作业列表,迭代作业,然后通过属性获取帐户条目,最后更新到帐户。

方法定义

Mono<Void> execute(List<String> users)

Jobs Dao 返回 Mono

Mono<List<JobInfo>> getJobsByUser(String user)
class JobInfo {
   String user;
   String account;
}

正在获取帐户返回 Mono 列表,

Mono<List<Account>> getAccounts(String account)

我正在努力使用 Mono 无阻塞方式来做到这一点。我尝试使用该块来执行此操作,但得到

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-
public Mono<Void> execute(List<String> users) {
   users.forEach(user -> {
    jobDao.getJobsByUser(user).block().stream().forEach(job -> {
      accountDam.getAccounts(job.getAccount()).block().forEach(acc -> {
        // update account
      });
    });
   
   });

   return Mono.empty().then();

}

据我了解,您需要以下流程:

public Mono<Void> execute(List<String> users) {
    return Flux.fromIterable(users)
        .flatMap(user -> getJobsByUser(user))
        .flatMapIterable(jobs -> jobs)
        .flatMap(job -> accountDam.getAccounts(job.getAccount()))
        .flatMapIterable(accounts -> accounts)
        .flatMap(account -> updateAccount(account))
        .then();
}