无法将可完成未来的结果获取到响应对象中
Not able to get the result of completable future into the response object
我正在使用 java.util.concurrent 的 CompletableFuture 接口和 Java8 调用四个 API。我想做多个休息电话,合并结果和 return 一个 JSON.
我试过 Future,它对我有用。然后我也想尝试CompletableFuture。
public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {
Map<String, Map<String, Object>> response = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(6);
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("mani");
return aClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("gani");
return bClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("priya");
return cClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("ravi");
return dClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
return response;
}
private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
if(s.size() > 0) {
for (Map<String, Object> maps : s) {
if (maps != null && maps.containsKey("abcd")) {
String abcd = maps.get("abcd").toString();
if(!response.containsKey(abcd))
response.put(maps.get("abcd").toString(), maps);
else {
for (Map.Entry<String, Object> entry: maps.entrySet()) {
response.get(abcd).put(entry.getKey(), entry.getValue());
}
}
}
}
}
}
我正在调用四个 Apis。我得到一个 Hashmap 列表作为每个 API 的响应。现在,每个 API 的结果需要组合成一个响应,即地图的地图。所以,我想,当我从 API 得到响应时,我将该结果放入哈希图中。
但我得到的是一个空洞的回应。这里可完成的未来调用 API 但服务器不等待调用响应和 returns。
如何让服务器等待?
请建议一种方法,以便我可以使用 completableFuture 执行此用例。
另外,建议一些更清洁的方法。
从api获取的响应:
{
{
"abcd": 1,
"cde": 2
},
{
"abcd": 2,
"cde": 3
}
}
将上面的响应解析为:
{
"1" : {
"abcd": 1,
"cde": 2
},
"2":{
"abcd": 2,
"cde": 3
}
}
我认为你的问题在于 CompletableFuture.supplyAsync()
没有被阻塞,所以你的代码立即向前移动,而不是等待应用异步操作。
代码以异步方式执行,在本例中这意味着,您告诉要执行 CompletableFuture.supplyAsync()
内部的操作,然后您继续前进。 thenApply()
部分仅在 supplyAsync
中的代码完成执行时调用,这很可能发生在您已经返回 response
.
之后
如果你想等所有CompletableFutures
执行完再返回response
,那么你应该使用CompletableFuture.join()
方法。
首先尝试重构您的代码,摆脱 thenApply()
部分,以便您的 ~CompletableFutures` 产生 部分响应。
然后将所有 CompletableFutures
分配给一些变量(在我的示例中为 myFirstFuture
、mySecondFuture
等)。
之后使用 Stream
对所有 CompletableFutures
应用 join()
方法并对每个结果应用 putInResponse
方法。
例如:
public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {
Map<String, Map<String, Object>> response = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(6);
CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("mani");
return aClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("gani");
return bClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("priya");
return cClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("ravi");
return dClient.transform(keys, requestObj);
}, executor);
Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(s -> putIntoResponse(response, s));
return response;
}
我正在使用 java.util.concurrent 的 CompletableFuture 接口和 Java8 调用四个 API。我想做多个休息电话,合并结果和 return 一个 JSON.
我试过 Future,它对我有用。然后我也想尝试CompletableFuture。
public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {
Map<String, Map<String, Object>> response = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(6);
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("mani");
return aClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("gani");
return bClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("priya");
return cClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("ravi");
return dClient.transform(keys, requestObj);
}, executor).thenApply(s -> {
putIntoResponse(response, s);
return true;
});
return response;
}
private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
if(s.size() > 0) {
for (Map<String, Object> maps : s) {
if (maps != null && maps.containsKey("abcd")) {
String abcd = maps.get("abcd").toString();
if(!response.containsKey(abcd))
response.put(maps.get("abcd").toString(), maps);
else {
for (Map.Entry<String, Object> entry: maps.entrySet()) {
response.get(abcd).put(entry.getKey(), entry.getValue());
}
}
}
}
}
}
我正在调用四个 Apis。我得到一个 Hashmap 列表作为每个 API 的响应。现在,每个 API 的结果需要组合成一个响应,即地图的地图。所以,我想,当我从 API 得到响应时,我将该结果放入哈希图中。
但我得到的是一个空洞的回应。这里可完成的未来调用 API 但服务器不等待调用响应和 returns。 如何让服务器等待? 请建议一种方法,以便我可以使用 completableFuture 执行此用例。 另外,建议一些更清洁的方法。
从api获取的响应:
{
{
"abcd": 1,
"cde": 2
},
{
"abcd": 2,
"cde": 3
}
}
将上面的响应解析为:
{
"1" : {
"abcd": 1,
"cde": 2
},
"2":{
"abcd": 2,
"cde": 3
}
}
我认为你的问题在于 CompletableFuture.supplyAsync()
没有被阻塞,所以你的代码立即向前移动,而不是等待应用异步操作。
代码以异步方式执行,在本例中这意味着,您告诉要执行 CompletableFuture.supplyAsync()
内部的操作,然后您继续前进。 thenApply()
部分仅在 supplyAsync
中的代码完成执行时调用,这很可能发生在您已经返回 response
.
如果你想等所有CompletableFutures
执行完再返回response
,那么你应该使用CompletableFuture.join()
方法。
首先尝试重构您的代码,摆脱 thenApply()
部分,以便您的 ~CompletableFutures` 产生 部分响应。
然后将所有 CompletableFutures
分配给一些变量(在我的示例中为 myFirstFuture
、mySecondFuture
等)。
之后使用 Stream
对所有 CompletableFutures
应用 join()
方法并对每个结果应用 putInResponse
方法。
例如:
public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {
Map<String, Map<String, Object>> response = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(6);
CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("mani");
return aClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("gani");
return bClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("priya");
return cClient.transform(keys, requestObj);
}, executor);
CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
List<Template> keys = new ArrayList<>();
keys.add("ravi");
return dClient.transform(keys, requestObj);
}, executor);
Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(s -> putIntoResponse(response, s));
return response;
}