单线程 Flux 中的 Mono
Mono in Flux by single thread
我有 Flux<Foo>
来自 db(例如 5 个元素)。
我需要从每个 Foo 获取一些信息,将其全部设置为 Mono<MyRequest>
,发送到另一个休息资源,获取 Mono<MyResponse>
并在每个 Foo 中使用它的所有信息。
我在 Flux.flatMap()
中做了很多 Mono.zipWith()
和 Mono.zipWhen()
,但是创建 MyRequest 和发送到资源由 5 个线程发生了 5 次。
Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono =
monoFilters.flatMap(filter -> monoRequest.map(...))
.zipWhen(request -> api.send(request))
.flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo ->
monoResponseFilters.zipWith(mono).map(...))
如何通过 Flux 中的 1 个线程仅处理一次我的 Mono 函数?
让我们假设这个任务是这样读的:
- 从数据库中获取一些值
- 当所有值都到达时,将它们包装在请求中并发送出去
- 带响应的压缩结果
然后这将我们引向这样的事情:
Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();
Mono<MyRequest> request = everything
// collect the data into another Mono, then into request
.map(list -> list.stream().map(Foo::getData).collect(toList()))
.map(data -> new MyRequest(data));
return request.zipWhen(request -> api.send(request));
或者,如果您映射初始 foos
:
,您可以更轻松地收集构建请求
Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);
我有 Flux<Foo>
来自 db(例如 5 个元素)。
我需要从每个 Foo 获取一些信息,将其全部设置为 Mono<MyRequest>
,发送到另一个休息资源,获取 Mono<MyResponse>
并在每个 Foo 中使用它的所有信息。
我在 Flux.flatMap()
中做了很多 Mono.zipWith()
和 Mono.zipWhen()
,但是创建 MyRequest 和发送到资源由 5 个线程发生了 5 次。
Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono =
monoFilters.flatMap(filter -> monoRequest.map(...))
.zipWhen(request -> api.send(request))
.flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo ->
monoResponseFilters.zipWith(mono).map(...))
如何通过 Flux 中的 1 个线程仅处理一次我的 Mono 函数?
让我们假设这个任务是这样读的:
- 从数据库中获取一些值
- 当所有值都到达时,将它们包装在请求中并发送出去
- 带响应的压缩结果
然后这将我们引向这样的事情:
Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();
Mono<MyRequest> request = everything
// collect the data into another Mono, then into request
.map(list -> list.stream().map(Foo::getData).collect(toList()))
.map(data -> new MyRequest(data));
return request.zipWhen(request -> api.send(request));
或者,如果您映射初始 foos
:
Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);