如何迭代 Flux 并与 Mono 混合
How to iterate Flux and mix with Mono
我有一个用例,我应该向用户发送电子邮件。
首先我创建电子邮件正文。
Mono<String> emailBody = ...cache();
然后我 select 用户并将电子邮件发送给他们:
Flux.fromIterable(userRepository.findAllByRole(Role.USER))
.map(User::getEmail)
.doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
.subscribe();
我不喜欢什么
- 没有 cache() 方法 emailBody Mono 在每个迭代步骤中计算。
- 为了获取 emailBody 值,我使用 emailBody.block() 但也许有一种反应方式而不是在 Flux 流程中调用块方法?
此代码示例中存在几个问题。
我假设这是一个响应式 Web 应用程序。
首先,不清楚您是如何创建电子邮件正文的;你是从数据库还是远程服务中获取东西?如果它主要是 CPU 绑定(而不是 I/O),那么你不需要将它包装成反应类型。现在,如果它应该包装在 Publisher
中并且电子邮件内容对所有用户都是相同的,那么使用 cache
运算符是一个不错的选择。
此外,Flux.fromIterable(userRepository.findAllByRole(Role.USER))
建议您从反应上下文调用阻塞存储库。
你应该永远不要在doOn***
运算符中做繁重的I/O操作。这些是为日志记录或轻微的副作用操作而制作的。您需要 .block()
的事实是您将阻塞整个反应管道的另一个线索。
最后一条:您不应该在 Web 应用程序的任何地方调用 subscribe
。如果它绑定到 HTTP 请求,则基本上是在不保证资源或完成的情况下触发反应式管道。调用 subscribe
触发管道但不会等到它完成(此方法 returns a Disposable
)。
更多 "typical" 示例如下:
Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();
// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
.flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
.then();
// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example
如果您不知何故受阻于阻塞组件(例如 sendEmail
组件),您应该在特定调度程序上安排这些阻塞操作,以避免阻塞整个反应管道。为此,请查看 Schedulers section on the reactor reference documentation.
我有一个用例,我应该向用户发送电子邮件。 首先我创建电子邮件正文。
Mono<String> emailBody = ...cache();
然后我 select 用户并将电子邮件发送给他们:
Flux.fromIterable(userRepository.findAllByRole(Role.USER))
.map(User::getEmail)
.doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
.subscribe();
我不喜欢什么
- 没有 cache() 方法 emailBody Mono 在每个迭代步骤中计算。
- 为了获取 emailBody 值,我使用 emailBody.block() 但也许有一种反应方式而不是在 Flux 流程中调用块方法?
此代码示例中存在几个问题。 我假设这是一个响应式 Web 应用程序。
首先,不清楚您是如何创建电子邮件正文的;你是从数据库还是远程服务中获取东西?如果它主要是 CPU 绑定(而不是 I/O),那么你不需要将它包装成反应类型。现在,如果它应该包装在 Publisher
中并且电子邮件内容对所有用户都是相同的,那么使用 cache
运算符是一个不错的选择。
此外,Flux.fromIterable(userRepository.findAllByRole(Role.USER))
建议您从反应上下文调用阻塞存储库。
你应该永远不要在doOn***
运算符中做繁重的I/O操作。这些是为日志记录或轻微的副作用操作而制作的。您需要 .block()
的事实是您将阻塞整个反应管道的另一个线索。
最后一条:您不应该在 Web 应用程序的任何地方调用 subscribe
。如果它绑定到 HTTP 请求,则基本上是在不保证资源或完成的情况下触发反应式管道。调用 subscribe
触发管道但不会等到它完成(此方法 returns a Disposable
)。
更多 "typical" 示例如下:
Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();
// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
.flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
.then();
// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example
如果您不知何故受阻于阻塞组件(例如 sendEmail
组件),您应该在特定调度程序上安排这些阻塞操作,以避免阻塞整个反应管道。为此,请查看 Schedulers section on the reactor reference documentation.