如何通过使用 ExecutorService 和 CompletableFuture 控制线程数,return 立即响应 spring 流量中的客户端?

How to return response immediate to client in spring flux by controlling the no of thread using ExecutorService and CompletableFuture?

我需要使用来自 Spring 基于通量的休息服务 API 的非阻塞 io 并行调用两个下游系统。但是第一个下游系统容量是一次10个请求,第二个下游系统是100个。

第一个下游系统输出是第二个下游系统的输入,因此我可以向第二个系统发出更并行的请求以加快流程。

第二个下游系统响应非常大,因此无法在内存中保存具体的所有响应所以立即想return将响应发送给客户端。

前工作流程:

示例代码:

@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<CompletableFuture> list = new ArrayList<>();

    AtomicInteger ai = new AtomicInteger(1);
    RestTemplate restTemplate = new RestTemplate();

    for (int i = 0; i < 100; i++) {
        CompletableFuture<Object> cff = CompletableFuture.supplyAsync(

                () -> ai.getAndAdd(1) + " first downstream web service " +
                        restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)

        ).thenApplyAsync(v -> {

            Random r = new Random();
            Integer in = r.nextInt(1000);

            return v + " second downstream web service  " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
        }, executor);

        list.add(cff);
    }

    return Flux.fromStream(list.stream().map(m -> {
                try {
                    return m.get().toString();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return "";
            })
    );

}

在我收到所有线程完成该过程的响应后,此代码仅适用于前五个线程。但是一旦我从第二个下游系统得到响应,我需要立即得到对客户端的响应。

注意:以上代码没有用二级线程池实现。

提前谢谢你。

如果您使用 Spring-Webflux 构建非阻塞系统,最好在示例中利用 WebClient 的功能。我创建了一个简单的测试应用程序,下面的代码片段对我有用:

private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system


@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
    return Flux
            .range(0, 100) // prepare initial 100 requests
            .window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)

            // .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
            .doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready

            .flatMap(flow -> flow
                    // perform an external async call for each element in batch of 10
                    // they will be executed sequentially but there will not be any performance issues because
                    // calls are async. If you wish you can add .parallel() to the flow to make it parallel
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )

            // subscribe to each response and throw received element further to the stream
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)))

            .window(1000) // batch of 1000 is ready
            .flatMap(flow -> flow
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}

public static class MyClass {
    public Integer i;
}

更新:

我准备了一个小应用程序来重现您的案例。您可以在 my repository.

中找到它