Reactor Java Flux 与 Mono
Reactor Java Flux vs Mono
我要实施以下方案。
通过用户 Id 获取作业列表,迭代作业,然后通过属性获取帐户条目,最后更新到帐户。
方法定义
Mono<Void> execute(List<String> users)
Jobs Dao 返回 Mono
Mono<List<JobInfo>> getJobsByUser(String user)
class JobInfo {
String user;
String account;
}
正在获取帐户返回 Mono 列表,
Mono<List<Account>> getAccounts(String account)
我正在努力使用 Mono 无阻塞方式来做到这一点。我尝试使用该块来执行此操作,但得到
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-
public Mono<Void> execute(List<String> users) {
users.forEach(user -> {
jobDao.getJobsByUser(user).block().stream().forEach(job -> {
accountDam.getAccounts(job.getAccount()).block().forEach(acc -> {
// update account
});
});
});
return Mono.empty().then();
}
据我了解,您需要以下流程:
public Mono<Void> execute(List<String> users) {
return Flux.fromIterable(users)
.flatMap(user -> getJobsByUser(user))
.flatMapIterable(jobs -> jobs)
.flatMap(job -> accountDam.getAccounts(job.getAccount()))
.flatMapIterable(accounts -> accounts)
.flatMap(account -> updateAccount(account))
.then();
}
我要实施以下方案。 通过用户 Id 获取作业列表,迭代作业,然后通过属性获取帐户条目,最后更新到帐户。
方法定义
Mono<Void> execute(List<String> users)
Jobs Dao 返回 Mono
Mono<List<JobInfo>> getJobsByUser(String user)
class JobInfo {
String user;
String account;
}
正在获取帐户返回 Mono 列表,
Mono<List<Account>> getAccounts(String account)
我正在努力使用 Mono 无阻塞方式来做到这一点。我尝试使用该块来执行此操作,但得到
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-
public Mono<Void> execute(List<String> users) {
users.forEach(user -> {
jobDao.getJobsByUser(user).block().stream().forEach(job -> {
accountDam.getAccounts(job.getAccount()).block().forEach(acc -> {
// update account
});
});
});
return Mono.empty().then();
}
据我了解,您需要以下流程:
public Mono<Void> execute(List<String> users) {
return Flux.fromIterable(users)
.flatMap(user -> getJobsByUser(user))
.flatMapIterable(jobs -> jobs)
.flatMap(job -> accountDam.getAccounts(job.getAccount()))
.flatMapIterable(accounts -> accounts)
.flatMap(account -> updateAccount(account))
.then();
}