无法将可完成未来的结果获取到响应对象中

Not able to get the result of completable future into the response object

我正在使用 java.util.concurrent 的 CompletableFuture 接口和 Java8 调用四个 API。我想做多个休息电话,合并结果和 return 一个 JSON.

我试过 Future,它对我有用。然后我也想尝试CompletableFuture。

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    return response;
}

private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
    if(s.size() > 0) {
        for (Map<String, Object> maps : s) {
            if (maps != null && maps.containsKey("abcd")) {
                String abcd = maps.get("abcd").toString();
                if(!response.containsKey(abcd))
                    response.put(maps.get("abcd").toString(), maps);
                else {
                    for (Map.Entry<String, Object> entry: maps.entrySet()) {
                        response.get(abcd).put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }
}

我正在调用四个 Apis。我得到一个 Hashmap 列表作为每个 API 的响应。现在,每个 API 的结果需要组合成一个响应,即地图的地图。所以,我想,当我从 API 得到响应时,我将该结果放入哈希图中。

但我得到的是一个空洞的回应。这里可完成的未来调用 API 但服务器不等待调用响应和 returns。 如何让服务器等待? 请建议一种方法,以便我可以使用 completableFuture 执行此用例。 另外,建议一些更清洁的方法。

从api获取的响应:

{ 
   {
    "abcd": 1,
    "cde": 2
   },
   { 
    "abcd": 2,
     "cde": 3
   }
}

将上面的响应解析为:

{
   "1" : {
    "abcd": 1,
    "cde": 2
   },
   "2":{ 
    "abcd": 2,
     "cde": 3
   }

}

我认为你的问题在于 CompletableFuture.supplyAsync() 没有被阻塞,所以你的代码立即向前移动,而不是等待应用异步操作。

代码以异步方式执行,在本例中这意味着,您告诉要执行 CompletableFuture.supplyAsync() 内部的操作,然后您继续前进。 thenApply() 部分仅在 supplyAsync 中的代码完成执行时调用,这很可能发生在您已经返回 response.

之后

如果你想等所有CompletableFutures执行完再返回response,那么你应该使用CompletableFuture.join()方法。

首先尝试重构您的代码,摆脱 thenApply() 部分,以便您的 ~CompletableFutures` 产生 部分响应

然后将所有 CompletableFutures 分配给一些变量(在我的示例中为 myFirstFuturemySecondFuture 等)。

之后使用 Stream 对所有 CompletableFutures 应用 join() 方法并对每个结果应用 putInResponse 方法。

例如:

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor);

    Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .forEachOrdered(s -> putIntoResponse(response, s));


    return response;
}