Spring 响应式编程:如何创建发布者的动态列表作为 Flux.merge 的输入
Spring Reactive Programming: How to create a dynamic list of Publishers as input to Flux.merge
我是 Spring 响应式编程的新手,我正在开发一个 returns Flux 的 REST 端点。例如:
@PostMapping
public Flux<MyResponse> processRequests(@RequestBody List<MyRequest> requests) {
return Flux.merge(Arrays.asList(dataSource.processRequest(requests.get(0)), dataSource2.processRequest(requests.get(0)))).parallel()
.runOn(Schedulers.elastic()).sequential();
}
示例代码中的每个数据源(dataSource 和 dataSource2)都实现了如下所示的接口:
public interface MyResponseAdapter {
Flux<MyResponse> processRequest(MyRequest request);
}
此代码工作正常,因为它 returns 符合预期的 Flux,但如您所见,代码仅引用 MyRequest 列表中的第一个元素。我需要做的是为 MyRequest 列表中的每个元素构造 Flux.merge。谁能指出我正确的方向?
我想我找到了一个简单的解决方案:
List<Flux<MyResponse>> results = new ArrayList<>();
for (MyRequest myRequest : requests ) {
results.add(dataSource.processRequest(myRequest));
results.add(dataSource2.processRequest(myRequest));
}
return Flux.merge(results).parallel().runOn(Schedulers.elastic()).sequential();
我是 Spring 响应式编程的新手,我正在开发一个 returns Flux 的 REST 端点。例如:
@PostMapping
public Flux<MyResponse> processRequests(@RequestBody List<MyRequest> requests) {
return Flux.merge(Arrays.asList(dataSource.processRequest(requests.get(0)), dataSource2.processRequest(requests.get(0)))).parallel()
.runOn(Schedulers.elastic()).sequential();
}
示例代码中的每个数据源(dataSource 和 dataSource2)都实现了如下所示的接口:
public interface MyResponseAdapter {
Flux<MyResponse> processRequest(MyRequest request);
}
此代码工作正常,因为它 returns 符合预期的 Flux,但如您所见,代码仅引用 MyRequest 列表中的第一个元素。我需要做的是为 MyRequest 列表中的每个元素构造 Flux.merge。谁能指出我正确的方向?
我想我找到了一个简单的解决方案:
List<Flux<MyResponse>> results = new ArrayList<>();
for (MyRequest myRequest : requests ) {
results.add(dataSource.processRequest(myRequest));
results.add(dataSource2.processRequest(myRequest));
}
return Flux.merge(results).parallel().runOn(Schedulers.elastic()).sequential();