运行 异步任务,在 return 通量数据库实体之前

Run Async task, before return flux db entities

我有Flux<URL>。如何为每个 URL(例如 myWebClient.refresh(URL))发出多个并发无效请求,然后(在所有请求完成后)从数据库读取数据和 return Flux<MyAnyEntity>(对于例如 repo.findAll())?

您可以使用 Flux/Mono 运算符实现:

// get the URIs from somewhere
Flux<URI> uris = //...

Flux<MyAnyEntity> entities = uris
                // map each URI to a HTTP client call and do nothing with the response
                .flatMap(uri -> webClient.get().uri(uri).exchange().then())
                // chain that call with a call on your repository
                .thenMany(repo.findAll());

更新:

这段代码自然是异步的,non-blocking所以flatMap算子中的所有操作都会并发执行,根据消费者沟通的需求(这就是我们说的背压) ).

如果 Reactive Streams Subscriber request(N) 个元素,那么 N 个请求可能会并发执行。我不认为这不是您想要直接处理的事情,尽管您可以使用窗口运算符进行 micro-bacthing 操作来影响事情。

在这种情况下使用 .subscribeOn(Schedulers.parallel()) 不会提高并发性 - as stated in the reference documentation 您应该只将其用于 CPU-bound 工作。