将传统的调用weclient的循环转换为非阻塞方式

Convert traditional loop of invoking weclient into non blocking way

我是响应式编程的新手,我想将以下代码转换为非阻塞方式。

为了简单起见,我根据我的原始代码创建了一个示例伪代码。任何帮助将不胜感激。

 public Mono<Response> getResponse(List<Provider> providers) {

    for (Provider provider : providers) {
        Response response = provider.invokeHttpCall().block();

        if(response.getMessage() == "Success") {
            return Mono.just(response);
        }

        continue;
    }

    return Mono.empty();
}

provider.invokeHttpCall() 方法

    @Override
    public Mono<Response> invokeHttpCall(){
        WebClient webClient = WebClient.create();
        
        
        return webClient.post()
                .uri("/provider").accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Response.class);
    }

我尝试了几种策略来实现这一点,但仍然没有成功。要么调用所有提供程序,要么我需要阻止 webclient 线程。

reactive 是 Stream 的一种。请将其视为 Stream 并对其进行反应式编程。

我给你这样的代码。

  1. 首先,使用 Flux.fromIterable() 从列表创建通量流。
  2. 接下来,使用 flatmap() 和 Lambda 函数将 invoke 发送到另一个新线程中。
  3. 使用方法 filterWhen() 和 Lambda 获得“成功”响应并仅获得第一个“成功”元素。请参阅 filterwhen api 文档
  4. 最后,只需使用 Mono.from() 包裹 Flux,然后 return 包裹 Mono 类型。
public Mono<Response> getResponse(List<Provider> providers) {

    return Mono.from(Flux.fromIterable(providers)
                .flatmap(provider -> 
                    Mono.defer(() -> provider.invokeHttpCall())
                .filterWhen(response -> response.getMessage() == "Success");
}

如果您想查看结果和 println()。 只需使用 .subsribe() 方法执行即可。

getResponse.subsribe(System.out::println);
Flux.fromIterable(providers)
    .concatMap(Provider::invokeHttpCall) // ensures providers are called sequentially
    .filter(response -> response.getMessage().equals("Success"))
    .next()