如何使用订阅方法 return async/reactive webclient 请求中的 Flux

How to return a Flux in async/reactive webclient request with subscribe method

我正在使用 spring 六边形架构(端口和适配器),因为我的应用程序需要从源主题读取数据流,process/transforms 数据,并将其发送到目标主题。

我的应用程序需要执行以下操作。

  1. 读取数据(会回调url)
  2. 使用传入数据中的 url 进行 http 调用(使用 webclient)
  3. 获取实际数据,需要将其转换为另一种格式。
  4. 将转换后的数据发送到传出主题。

这是我的代码,

public Flux<TargeData> getData(Flux<Message<EventInput>> message) 
{
    return message
        .flatMap(it -> {
            Event event = objectMapper.convertValue(it.getPayload(), Event.class);
            String eventType = event.getHeader().getEventType();
            String callBackURL = "";
            if (DISTRIBUTOR.equals(eventType)) {
                callBackURL = event.getHeader().getCallbackEnpoint();
                WebClient client = WebClient.create();
                Flux<NodeInput> nodeInputFlux = client.get()
                    .uri(callBackURL)
                    .headers(httpHeaders -> {
                        httpHeaders.setContentType(MediaType.APPLICATION_JSON);
                        List<MediaType> acceptTypes = new ArrayList<>();
                        acceptTypes.add(MediaType.APPLICATION_JSON);
                        httpHeaders.setAccept(acceptTypes);
                    })
                .exchangeToFlux(response -> {
                    if (response.statusCode()
                            .equals(HttpStatus.OK)) {
                        System.out.println("Response is OK");
                        return response.bodyToFlux(NodeInput.class);
                    }
                    return Flux.empty();
                });
                nodeInputFlux.subscribe( nodeInput -> {
                    SourceData source = objectMapper.convertValue(nodeInput, SourceData.class);
                //  return Flux.fromIterable(this.TransformImpl.transform(source));
                });
            }
        return Flux.empty();
    });
}

上面代码中的注释行给出了编译,因为 subscribe 方法不允许 return 类型。

我需要一个“不使用块”的解决方案。

请帮助我,在此先感谢。

我想我理解了其中的逻辑。你可能想要的是这个:


public Flux<TargeData> getData(Flux<Message<EventInput>> message) {
    return message
        .flatMap(it -> {
          // 1. marshall and unmarshall operations are CPU expensive and could harm event loop
          return Mono.fromCallable(() -> objectMapper.convertValue(it.getPayload(), Event.class))
              .subscribeOn(Schedulers.parallel());
        })
        .filter(event -> {
          // 2. Moving the if-statement yours to a filter - same behavior
          String eventType = event.getHeader().getEventType();
          return DISTRIBUTOR.equals(eventType);
        })
        // Here is the trick 1 - your request below return Flux of SourceData the we will flatten
        // into a single Flux<SourceData> instead of Flux<List<SourceData>> with flatMapMany
        .flatMap(event -> {
          // This WebClient should not be created here. Should be a singleton injected on your class
          WebClient client = WebClient.create();

          return client.get()
              .uri(event.getHeader().getCallbackEnpoint())
              .accept(MediaType.APPLICATION_JSON)
              .exchangeToFlux(response -> {
                if (response.statusCode().equals(HttpStatus.OK)) {
                  System.out.println("Response is OK");
                  return response.bodyToFlux(SourceData.class);
                }
                return Flux.empty();
              });
        })
        // Here is the trick 2 - supposing that transform return a Iterable of TargetData, then you should do this and will have Flux<TargetData>
        // and flatten instead of Flux<List<TargetData>>
        .flatMapIterable(source -> this.TransformImpl.transform(source));
}