运行 异步任务,在 return 通量数据库实体之前
Run Async task, before return flux db entities
我有Flux<URL>
。如何为每个 URL(例如 myWebClient.refresh(URL)
)发出多个并发无效请求,然后(在所有请求完成后)从数据库读取数据和 return Flux<MyAnyEntity>
(对于例如 repo.findAll()
)?
您可以使用 Flux
/Mono
运算符实现:
// get the URIs from somewhere
Flux<URI> uris = //...
Flux<MyAnyEntity> entities = uris
// map each URI to a HTTP client call and do nothing with the response
.flatMap(uri -> webClient.get().uri(uri).exchange().then())
// chain that call with a call on your repository
.thenMany(repo.findAll());
更新:
这段代码自然是异步的,non-blocking所以flatMap
算子中的所有操作都会并发执行,根据消费者沟通的需求(这就是我们说的背压) ).
如果 Reactive Streams Subscriber
request(N)
个元素,那么 N
个请求可能会并发执行。我不认为这不是您想要直接处理的事情,尽管您可以使用窗口运算符进行 micro-bacthing 操作来影响事情。
在这种情况下使用 .subscribeOn(Schedulers.parallel())
不会提高并发性 - as stated in the reference documentation 您应该只将其用于 CPU-bound 工作。
我有Flux<URL>
。如何为每个 URL(例如 myWebClient.refresh(URL)
)发出多个并发无效请求,然后(在所有请求完成后)从数据库读取数据和 return Flux<MyAnyEntity>
(对于例如 repo.findAll()
)?
您可以使用 Flux
/Mono
运算符实现:
// get the URIs from somewhere
Flux<URI> uris = //...
Flux<MyAnyEntity> entities = uris
// map each URI to a HTTP client call and do nothing with the response
.flatMap(uri -> webClient.get().uri(uri).exchange().then())
// chain that call with a call on your repository
.thenMany(repo.findAll());
更新:
这段代码自然是异步的,non-blocking所以flatMap
算子中的所有操作都会并发执行,根据消费者沟通的需求(这就是我们说的背压) ).
如果 Reactive Streams Subscriber
request(N)
个元素,那么 N
个请求可能会并发执行。我不认为这不是您想要直接处理的事情,尽管您可以使用窗口运算符进行 micro-bacthing 操作来影响事情。
在这种情况下使用 .subscribeOn(Schedulers.parallel())
不会提高并发性 - as stated in the reference documentation 您应该只将其用于 CPU-bound 工作。