Spring webflux webclient 在评估第一次调用的响应时再次调用

Spring webflux webclient make another call while evaluating response from first call

我正在使用 webclient 调用端点,并希望将我获得的响应映射到另一个对象。在映射该对象时,我想对响应的某些参数进行额外的调用。

我第一次调用returns下面的对象

{
  "type": "Collection",
  "key": "some.key",
  "settings": [
    {
      "type": "Struct",
      "key": "steps",
      "value": [
        {
          "type": "Struct",
          "key": "1",
          "value": [
            {
              "type": "String",
              "key": "headline",
              "value": "someheadline"
            },
            {
              "type": "String",
              "key": "subheadline",
              "value": "somesubheadline"
            },
            {
              "type": "Link",
              "key": "link.to.another.object",
              "value": {
                "linkType": "Boilerplate",
                "key": "configurabletextkey"
              }
            }
          ]
        }
      ]
    },
    {
      "type": "Struct",
      "key": "commons",
      "value": [
        {
          "type": "String",
          "key": "mandatory.fields.text",
          "value": "Pflichtfelder"
        }
      ]
    }
  ]
}

我这样映射那个响应:

webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentId)
                    .build())
        .retrieve()
        .bodyToMono(Collection.class)
        .map(response -> {
                  return getCollectionContent(response);
                })

在 getCollectionContent 方法中,我遍历设置数组并从响应中提取数据并将其映射到我的 PageContent 对象。

public class PageContent {

  private String pageId;

  private List<Message> messages;
}
public class Message {

  @NonNull private String key;

  @NonNull private String text;

  private Boolean containsHtml = false;
}

如果响应包含“String”类型,我将把数据添加到消息对象并将其添加到 PageContent 的列表中。

现在进入正题。如果类型是“Link”,我想像上面那样用 webclient 对同一个端点进行另一个调用以获取该对象的键和文本,从中创建一个消息对象并将其添加到我现有的列表中。

对应的代码如下所示:

webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentKey)
                    .build())
        .retrieve()
        .bodyToMono(ConfigurableText.class)
        .map(
                configurableTextResponse -> {
                  messages.add(
                      new Message(
                          prefix + configurableTextResponse.getKey(),
                          configurableTextResponse.getText(),
                          true));
                  return Mono.empty();
                })

现在,当我尝试执行此操作时,什么也没有发生,我只是收到 PageContent 对象,而没有 link.

的消息

在 resttemplate 的阻塞方式中,这个逻辑应该可以工作,但我想让它与 webclient 一起工作。

编辑:

遍历列表并提取消息数据的代码:

private PageContent getCollectionContent(Collection response) {

    PageContent pageContent = new PageContent();
    pageContent.setPageId(response.getKey());

    List<Message> messages = new ArrayList<>();

    response
        .getSettings()
        .forEach(
            settingsItemsArray -> {
              var settingsItemList = (List<?>) settingsItemsArray.getValue();
              String prefix = settingsItemsArray.getKey() + ".";

              extractMessageText(prefix, (LinkedHashMap<?, ?>) settingsItemList.get(0), messages);
            });

    pageContent.setMessages(messages);

    return pageContent;
  }

用于提取 MessageText、进一步迭代或获取 link 类型缺失文本的代码。

private void extractMessageText(
      String prefix, LinkedHashMap<?, ?> settingsItem, List<Message> messages) {

    String itemKey = (String) settingsItem.get(KEY);
    String itemType = (String) settingsItem.get(TYPE);

    switch (itemType) {
      case "String":
        messages.add(new Message(prefix + itemKey, (String) settingsItem.get(VALUE)));
        break;
      case "Struct":
        ((List<?>) settingsItem.get(VALUE))
            .forEach(
                structItems ->
                    extractMessageText(
                        prefix + settingsItem.get(KEY) + ".",
                        (LinkedHashMap<?, ?>) structItems,
                        messages));
        break;
      case "Link":
        webClient
        .get()
        .uri(
            uriBuilder ->
                uriBuilder
                    .path(headlessConfig.getEndpoint())
                    .pathSegment(contentKey)
                    .build())
        .retrieve()
        .bodyToMono(ConfigurableText.class)
        .map(
                configurableTextResponse -> {
                  messages.add(
                      new Message(
                          prefix + configurableTextResponse.getKey(),
                          configurableTextResponse.getText(),
                          true));
                  return Mono.empty();
                })
        break;
      default:
        break;
    }
  }

我已经更改了您的一些代码以使其与反应堆模式更加兼容。我已经将递归更改为 expandDeep 并且还使用 Jackson 来解析 JSON。我希望这会给您一些解决问题的想法。

List<Message> messages = Flux
                .fromIterable(jsonNode.get("settings"))
                //expand the graph into a stream of flat data and track the address of the node with 'prefix'
                //expand/exapndDeep operators are alternatives of recursion in project reactor
                .expandDeep(parent -> {
                    String parentPrefix = Optional.ofNullable(parent.get("prefix")).map(JsonNode::asText)
                            .orElse(parent.get("key").asText());
                    String type = parent.get("type").asText();
                    if (type.equals("Struct")) {
                        return Flux.fromIterable(parent.get("value"))
                                .cast(ObjectNode.class)
                                .map(child -> child.put("prefix", parentPrefix + ":" + child.get("key").asText()));
                    }
                    return Mono.empty();
                })
                //we have to choose only leaf nodes aka String and Link nodes
                .filter(node -> Arrays.asList("String", "Link").contains(node.get("type").asText()))
                //now process expanded leaf nodes
                .flatMap(leaf -> {
                    if ("String".equals(leaf.get("type").asText())) {
                        return Mono.just(new Message(leaf.get("prefix").asText(), leaf.get("value").asText(), true));
                    }
                    if ("Link".equals(leaf.get("type").asText())) {
                        return webClient
                                .get()
                                .uri(
                                        uriBuilder ->
                                                uriBuilder
                                                        .pathSegment(leaf.get("key").asText())
                                                        .build())
                                .retrieve()
                                .bodyToMono(JsonNode.class)
                                .map(configurableTextResponse -> new Message(
                                        leaf.get("prefix") + configurableTextResponse.get("key").asText(),
                                        configurableTextResponse.get("text").asText(),
                                        true));
                    }
                    return Mono.empty();
                })
                // at this point we are getting stream of the Message objects from the Link/String nodes
                //collect them into a list
                .collectList()
                //we have to subscribe()/block() the mono to actually invoke the pipline.
                .block();

您的代码没有执行任何操作的主要原因是您没有订阅 WebClient 管道。

编辑:

改变

  .map(response -> {
                  return getCollectionContent(response);
                })

.flatMap(response -> {
                      return getCollectionContent(response);
                    })

和 return 来自 getCollectionContent(response) Mono<PageContent> page

类似于:

     // at this point we are getting stream of the Message objects from the Link/String nodes
                    //collect them into a list
                    .collectList()
                    .map(messages -> {
                        PageContent pageContent = new PageContent();
                        pageContent.setPageId(response.get("pageId").asText());
pageContent.setMessages(messages);
                        return pageContent;
                    });

进行这些更改后,您的 getCollectionContent() 将 return 发布者 Mono<PageContent> 将由 flatMap 运营商订阅。