Spring webflux webclient 在评估第一次调用的响应时再次调用
Spring webflux webclient make another call while evaluating response from first call
我正在使用 webclient 调用端点,并希望将我获得的响应映射到另一个对象。在映射该对象时,我想对响应的某些参数进行额外的调用。
我第一次调用returns下面的对象
{
"type": "Collection",
"key": "some.key",
"settings": [
{
"type": "Struct",
"key": "steps",
"value": [
{
"type": "Struct",
"key": "1",
"value": [
{
"type": "String",
"key": "headline",
"value": "someheadline"
},
{
"type": "String",
"key": "subheadline",
"value": "somesubheadline"
},
{
"type": "Link",
"key": "link.to.another.object",
"value": {
"linkType": "Boilerplate",
"key": "configurabletextkey"
}
}
]
}
]
},
{
"type": "Struct",
"key": "commons",
"value": [
{
"type": "String",
"key": "mandatory.fields.text",
"value": "Pflichtfelder"
}
]
}
]
}
我这样映射那个响应:
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentId)
.build())
.retrieve()
.bodyToMono(Collection.class)
.map(response -> {
return getCollectionContent(response);
})
在 getCollectionContent 方法中,我遍历设置数组并从响应中提取数据并将其映射到我的 PageContent 对象。
public class PageContent {
private String pageId;
private List<Message> messages;
}
public class Message {
@NonNull private String key;
@NonNull private String text;
private Boolean containsHtml = false;
}
如果响应包含“String”类型,我将把数据添加到消息对象并将其添加到 PageContent 的列表中。
现在进入正题。如果类型是“Link”,我想像上面那样用 webclient 对同一个端点进行另一个调用以获取该对象的键和文本,从中创建一个消息对象并将其添加到我现有的列表中。
对应的代码如下所示:
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentKey)
.build())
.retrieve()
.bodyToMono(ConfigurableText.class)
.map(
configurableTextResponse -> {
messages.add(
new Message(
prefix + configurableTextResponse.getKey(),
configurableTextResponse.getText(),
true));
return Mono.empty();
})
现在,当我尝试执行此操作时,什么也没有发生,我只是收到 PageContent 对象,而没有 link.
的消息
在 resttemplate 的阻塞方式中,这个逻辑应该可以工作,但我想让它与 webclient 一起工作。
编辑:
遍历列表并提取消息数据的代码:
private PageContent getCollectionContent(Collection response) {
PageContent pageContent = new PageContent();
pageContent.setPageId(response.getKey());
List<Message> messages = new ArrayList<>();
response
.getSettings()
.forEach(
settingsItemsArray -> {
var settingsItemList = (List<?>) settingsItemsArray.getValue();
String prefix = settingsItemsArray.getKey() + ".";
extractMessageText(prefix, (LinkedHashMap<?, ?>) settingsItemList.get(0), messages);
});
pageContent.setMessages(messages);
return pageContent;
}
用于提取 MessageText、进一步迭代或获取 link 类型缺失文本的代码。
private void extractMessageText(
String prefix, LinkedHashMap<?, ?> settingsItem, List<Message> messages) {
String itemKey = (String) settingsItem.get(KEY);
String itemType = (String) settingsItem.get(TYPE);
switch (itemType) {
case "String":
messages.add(new Message(prefix + itemKey, (String) settingsItem.get(VALUE)));
break;
case "Struct":
((List<?>) settingsItem.get(VALUE))
.forEach(
structItems ->
extractMessageText(
prefix + settingsItem.get(KEY) + ".",
(LinkedHashMap<?, ?>) structItems,
messages));
break;
case "Link":
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentKey)
.build())
.retrieve()
.bodyToMono(ConfigurableText.class)
.map(
configurableTextResponse -> {
messages.add(
new Message(
prefix + configurableTextResponse.getKey(),
configurableTextResponse.getText(),
true));
return Mono.empty();
})
break;
default:
break;
}
}
我已经更改了您的一些代码以使其与反应堆模式更加兼容。我已经将递归更改为 expandDeep
并且还使用 Jackson 来解析 JSON。我希望这会给您一些解决问题的想法。
List<Message> messages = Flux
.fromIterable(jsonNode.get("settings"))
//expand the graph into a stream of flat data and track the address of the node with 'prefix'
//expand/exapndDeep operators are alternatives of recursion in project reactor
.expandDeep(parent -> {
String parentPrefix = Optional.ofNullable(parent.get("prefix")).map(JsonNode::asText)
.orElse(parent.get("key").asText());
String type = parent.get("type").asText();
if (type.equals("Struct")) {
return Flux.fromIterable(parent.get("value"))
.cast(ObjectNode.class)
.map(child -> child.put("prefix", parentPrefix + ":" + child.get("key").asText()));
}
return Mono.empty();
})
//we have to choose only leaf nodes aka String and Link nodes
.filter(node -> Arrays.asList("String", "Link").contains(node.get("type").asText()))
//now process expanded leaf nodes
.flatMap(leaf -> {
if ("String".equals(leaf.get("type").asText())) {
return Mono.just(new Message(leaf.get("prefix").asText(), leaf.get("value").asText(), true));
}
if ("Link".equals(leaf.get("type").asText())) {
return webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.pathSegment(leaf.get("key").asText())
.build())
.retrieve()
.bodyToMono(JsonNode.class)
.map(configurableTextResponse -> new Message(
leaf.get("prefix") + configurableTextResponse.get("key").asText(),
configurableTextResponse.get("text").asText(),
true));
}
return Mono.empty();
})
// at this point we are getting stream of the Message objects from the Link/String nodes
//collect them into a list
.collectList()
//we have to subscribe()/block() the mono to actually invoke the pipline.
.block();
您的代码没有执行任何操作的主要原因是您没有订阅 WebClient 管道。
编辑:
改变
.map(response -> {
return getCollectionContent(response);
})
至
.flatMap(response -> {
return getCollectionContent(response);
})
和 return 来自 getCollectionContent(response)
Mono<PageContent> page
类似于:
// at this point we are getting stream of the Message objects from the Link/String nodes
//collect them into a list
.collectList()
.map(messages -> {
PageContent pageContent = new PageContent();
pageContent.setPageId(response.get("pageId").asText());
pageContent.setMessages(messages);
return pageContent;
});
进行这些更改后,您的 getCollectionContent()
将 return 发布者 Mono<PageContent>
将由 flatMap 运营商订阅。
我正在使用 webclient 调用端点,并希望将我获得的响应映射到另一个对象。在映射该对象时,我想对响应的某些参数进行额外的调用。
我第一次调用returns下面的对象
{
"type": "Collection",
"key": "some.key",
"settings": [
{
"type": "Struct",
"key": "steps",
"value": [
{
"type": "Struct",
"key": "1",
"value": [
{
"type": "String",
"key": "headline",
"value": "someheadline"
},
{
"type": "String",
"key": "subheadline",
"value": "somesubheadline"
},
{
"type": "Link",
"key": "link.to.another.object",
"value": {
"linkType": "Boilerplate",
"key": "configurabletextkey"
}
}
]
}
]
},
{
"type": "Struct",
"key": "commons",
"value": [
{
"type": "String",
"key": "mandatory.fields.text",
"value": "Pflichtfelder"
}
]
}
]
}
我这样映射那个响应:
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentId)
.build())
.retrieve()
.bodyToMono(Collection.class)
.map(response -> {
return getCollectionContent(response);
})
在 getCollectionContent 方法中,我遍历设置数组并从响应中提取数据并将其映射到我的 PageContent 对象。
public class PageContent {
private String pageId;
private List<Message> messages;
}
public class Message {
@NonNull private String key;
@NonNull private String text;
private Boolean containsHtml = false;
}
如果响应包含“String”类型,我将把数据添加到消息对象并将其添加到 PageContent 的列表中。
现在进入正题。如果类型是“Link”,我想像上面那样用 webclient 对同一个端点进行另一个调用以获取该对象的键和文本,从中创建一个消息对象并将其添加到我现有的列表中。
对应的代码如下所示:
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentKey)
.build())
.retrieve()
.bodyToMono(ConfigurableText.class)
.map(
configurableTextResponse -> {
messages.add(
new Message(
prefix + configurableTextResponse.getKey(),
configurableTextResponse.getText(),
true));
return Mono.empty();
})
现在,当我尝试执行此操作时,什么也没有发生,我只是收到 PageContent 对象,而没有 link.
的消息在 resttemplate 的阻塞方式中,这个逻辑应该可以工作,但我想让它与 webclient 一起工作。
编辑:
遍历列表并提取消息数据的代码:
private PageContent getCollectionContent(Collection response) {
PageContent pageContent = new PageContent();
pageContent.setPageId(response.getKey());
List<Message> messages = new ArrayList<>();
response
.getSettings()
.forEach(
settingsItemsArray -> {
var settingsItemList = (List<?>) settingsItemsArray.getValue();
String prefix = settingsItemsArray.getKey() + ".";
extractMessageText(prefix, (LinkedHashMap<?, ?>) settingsItemList.get(0), messages);
});
pageContent.setMessages(messages);
return pageContent;
}
用于提取 MessageText、进一步迭代或获取 link 类型缺失文本的代码。
private void extractMessageText(
String prefix, LinkedHashMap<?, ?> settingsItem, List<Message> messages) {
String itemKey = (String) settingsItem.get(KEY);
String itemType = (String) settingsItem.get(TYPE);
switch (itemType) {
case "String":
messages.add(new Message(prefix + itemKey, (String) settingsItem.get(VALUE)));
break;
case "Struct":
((List<?>) settingsItem.get(VALUE))
.forEach(
structItems ->
extractMessageText(
prefix + settingsItem.get(KEY) + ".",
(LinkedHashMap<?, ?>) structItems,
messages));
break;
case "Link":
webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.path(headlessConfig.getEndpoint())
.pathSegment(contentKey)
.build())
.retrieve()
.bodyToMono(ConfigurableText.class)
.map(
configurableTextResponse -> {
messages.add(
new Message(
prefix + configurableTextResponse.getKey(),
configurableTextResponse.getText(),
true));
return Mono.empty();
})
break;
default:
break;
}
}
我已经更改了您的一些代码以使其与反应堆模式更加兼容。我已经将递归更改为 expandDeep
并且还使用 Jackson 来解析 JSON。我希望这会给您一些解决问题的想法。
List<Message> messages = Flux
.fromIterable(jsonNode.get("settings"))
//expand the graph into a stream of flat data and track the address of the node with 'prefix'
//expand/exapndDeep operators are alternatives of recursion in project reactor
.expandDeep(parent -> {
String parentPrefix = Optional.ofNullable(parent.get("prefix")).map(JsonNode::asText)
.orElse(parent.get("key").asText());
String type = parent.get("type").asText();
if (type.equals("Struct")) {
return Flux.fromIterable(parent.get("value"))
.cast(ObjectNode.class)
.map(child -> child.put("prefix", parentPrefix + ":" + child.get("key").asText()));
}
return Mono.empty();
})
//we have to choose only leaf nodes aka String and Link nodes
.filter(node -> Arrays.asList("String", "Link").contains(node.get("type").asText()))
//now process expanded leaf nodes
.flatMap(leaf -> {
if ("String".equals(leaf.get("type").asText())) {
return Mono.just(new Message(leaf.get("prefix").asText(), leaf.get("value").asText(), true));
}
if ("Link".equals(leaf.get("type").asText())) {
return webClient
.get()
.uri(
uriBuilder ->
uriBuilder
.pathSegment(leaf.get("key").asText())
.build())
.retrieve()
.bodyToMono(JsonNode.class)
.map(configurableTextResponse -> new Message(
leaf.get("prefix") + configurableTextResponse.get("key").asText(),
configurableTextResponse.get("text").asText(),
true));
}
return Mono.empty();
})
// at this point we are getting stream of the Message objects from the Link/String nodes
//collect them into a list
.collectList()
//we have to subscribe()/block() the mono to actually invoke the pipline.
.block();
您的代码没有执行任何操作的主要原因是您没有订阅 WebClient 管道。
编辑:
改变
.map(response -> {
return getCollectionContent(response);
})
至
.flatMap(response -> {
return getCollectionContent(response);
})
和 return 来自 getCollectionContent(response)
Mono<PageContent> page
类似于:
// at this point we are getting stream of the Message objects from the Link/String nodes
//collect them into a list
.collectList()
.map(messages -> {
PageContent pageContent = new PageContent();
pageContent.setPageId(response.get("pageId").asText());
pageContent.setMessages(messages);
return pageContent;
});
进行这些更改后,您的 getCollectionContent()
将 return 发布者 Mono<PageContent>
将由 flatMap 运营商订阅。