单线程 Flux 中的 Mono

Mono in Flux by single thread

我有 Flux<Foo> 来自 db(例如 5 个元素)。 我需要从每个 Foo 获取一些信息,将其全部设置为 Mono<MyRequest>,发送到另一个休息资源,获取 Mono<MyResponse> 并在每个 Foo 中使用它的所有信息。

我在 Flux.flatMap() 中做了很多 Mono.zipWith()Mono.zipWhen()但是创建 MyRequest 和发送到资源由 5 个线程发生了 5 次。

Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono = 
                      monoFilters.flatMap(filter -> monoRequest.map(...))
                                 .zipWhen(request -> api.send(request))
                                 .flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo -> 
                       monoResponseFilters.zipWith(mono).map(...))

如何通过 Flux 中的 1 个线程仅处理一次我的 Mono 函数?

让我们假设这个任务是这样读的:

  • 从数据库中获取一些值
  • 当所有值都到达时,将它们包装在请求中并发送出去
  • 带响应的压缩结果

然后这将我们引向这样的事情:

Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();

Mono<MyRequest> request = everything
    // collect the data into another Mono, then into request
    .map(list -> list.stream().map(Foo::getData).collect(toList()))
    .map(data -> new MyRequest(data));

return request.zipWhen(request -> api.send(request));

或者,如果您映射初始 foos:

,您可以更轻松地收集构建请求
Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);