Spring 反应式:混合 RestTemplate 和 WebClient
Spring reactive : mixing RestTemplate & WebClient
我有两个端点:/parent
和 /child/{parentId}
我需要 return 所有 Child
的列表
public class Parent {
private long id;
private Child child;
}
public class Child {
private long childId;
private String someAttribute;
}
但是,调用 /child/{parentId}
很慢,所以我尝试这样做:
- 调用
/parent
获取100个父数据,使用异步RestTemplate
- 对于每个父数据,调用
/child/{parentId}
获取详细信息
- 将
/child/{parentId}
的结果调用添加到 resultList
- 当对
/child/{parentId}
的 100 次调用完成后,return resultList
我使用包装器 class,因为大多数端点 returns JSON 的格式为:
{
"next": "String",
"data": [
// parent goes here
]
}
所以我把它包在这个
public class ResponseWrapper<T> {
private List<T> data;
private String next;
}
我写了这段代码,但是resultList总是return个空元素。
实现此目标的正确方法是什么?
public List<Child> getAllParents() {
var endpointParent = StringUtils.join(HOST, "/parent");
var resultList = new ArrayList<Child>();
var responseParent = restTemplate.exchange(endpointParent, HttpMethod.GET, httpEntity,
new ParameterizedTypeReference<ResponseWrapper<Parent>>() {
});
responseParent.getBody().getData().stream().forEach(parent -> {
var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
// async call due to slow endpoint child
webClient.get().uri(endpointChild).retrieve()
.bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
}).map(wrapper -> wrapper.getData()).subscribe(children -> {
children.stream().forEach(child -> resultList.add(child));
});
});
return resultList;
}
在响应式类型上调用 subscribe
会开始处理,但会立即 returns;此时您无法保证处理已完成。因此,当您的代码片段调用 return resultList
时,WebClient
可能仍在忙于获取内容。
您最好放弃异步 resttemplate(现在已弃用,支持 WebClient)并构建单个管道,如:
public List<Child> getAllParents() {
var endpointParent = StringUtils.join(HOST, "/parent");
var resultList = new ArrayList<Child>();
Flux<Parent> parents = webClient.get().uri(endpointParent)
.retrieve().bodyToMono(ResponseWrapper.class)
.flatMapMany(wrapper -> Flux.fromIterable(wrapper.data));
return parents.flatMap(parent -> {
var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
return webClient.get().uri(endpointChild).retrieve()
.bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
}).flatMapMany(wrapper -> Flux.fromIterable(wrapper.getData()));
}).collectList().block();
}
默认情况下,parents.flatMap
运算符将处理具有一定并发性的元素(我相信默认情况下为 16 个)。您可以通过使用选定的并发值调用 Flux.flatMap
运算符的另一个变体来选择不同的值。
我有两个端点:/parent
和 /child/{parentId}
我需要 return 所有 Child
public class Parent {
private long id;
private Child child;
}
public class Child {
private long childId;
private String someAttribute;
}
但是,调用 /child/{parentId}
很慢,所以我尝试这样做:
- 调用
/parent
获取100个父数据,使用异步RestTemplate - 对于每个父数据,调用
/child/{parentId}
获取详细信息 - 将
/child/{parentId}
的结果调用添加到resultList
- 当对
/child/{parentId}
的 100 次调用完成后,returnresultList
我使用包装器 class,因为大多数端点 returns JSON 的格式为:
{
"next": "String",
"data": [
// parent goes here
]
}
所以我把它包在这个
public class ResponseWrapper<T> {
private List<T> data;
private String next;
}
我写了这段代码,但是resultList总是return个空元素。 实现此目标的正确方法是什么?
public List<Child> getAllParents() {
var endpointParent = StringUtils.join(HOST, "/parent");
var resultList = new ArrayList<Child>();
var responseParent = restTemplate.exchange(endpointParent, HttpMethod.GET, httpEntity,
new ParameterizedTypeReference<ResponseWrapper<Parent>>() {
});
responseParent.getBody().getData().stream().forEach(parent -> {
var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
// async call due to slow endpoint child
webClient.get().uri(endpointChild).retrieve()
.bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
}).map(wrapper -> wrapper.getData()).subscribe(children -> {
children.stream().forEach(child -> resultList.add(child));
});
});
return resultList;
}
在响应式类型上调用 subscribe
会开始处理,但会立即 returns;此时您无法保证处理已完成。因此,当您的代码片段调用 return resultList
时,WebClient
可能仍在忙于获取内容。
您最好放弃异步 resttemplate(现在已弃用,支持 WebClient)并构建单个管道,如:
public List<Child> getAllParents() {
var endpointParent = StringUtils.join(HOST, "/parent");
var resultList = new ArrayList<Child>();
Flux<Parent> parents = webClient.get().uri(endpointParent)
.retrieve().bodyToMono(ResponseWrapper.class)
.flatMapMany(wrapper -> Flux.fromIterable(wrapper.data));
return parents.flatMap(parent -> {
var endpointChild = StringUtils.join(HOST, "/child/", parent.getId());
return webClient.get().uri(endpointChild).retrieve()
.bodyToMono(new ParameterizedTypeReference<ResponseWrapper<Child>>() {
}).flatMapMany(wrapper -> Flux.fromIterable(wrapper.getData()));
}).collectList().block();
}
默认情况下,parents.flatMap
运算符将处理具有一定并发性的元素(我相信默认情况下为 16 个)。您可以通过使用选定的并发值调用 Flux.flatMap
运算符的另一个变体来选择不同的值。