CompletableFuture 超时
CompletableFuture with timeout
我最近才开始使用 CompletableFuture,我遇到了一个问题,我有 N 个请求待办。
每个请求都应发送到 2 个不同的端点,其结果应与 JSON 进行比较。由于我有大量请求待办,而且我不知道每个请求需要多少时间,所以我想限制等待结果的时间,例如 3 秒左右。
所以我写了这段测试代码:
public class MainTest {
private static final Logger logger = LoggerFactory.getLogger(MainTest.class);
private Instant start;
public static void main(String[] args) {
MainTest main = new MainTest();
main.start();
}
public void start(){
String req1 = "http://localhost:8080/testing";
String req2 = "http://127.0.0.1:8095/testing2";
ExecutorService exec = Executors.newCachedThreadPool();
start = Instant.now();
CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);
List<CompletableFuture<String>> completables = List.of(comp1,comp2);
logger.info("Waiting completables");
CompletableFuture<List<String>> a = allOf(completables);
List<String> r = new ArrayList<>();
try {
r = a.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
Instant end = Instant.now();
logger.info(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));
System.out.println(r.size());
r.forEach(System.out::println);
}
exec.shutdown();
}
public String doReq(String request){
AtomicReference<String> response = new AtomicReference<>("default");
try{
logger.info("Sending request: {}", request);
Unirest.get(request).asJson()
.ifSuccess(r -> {
response.set(r.getBody().toString());
})
.ifFailure(r -> {
logger.error("Oh No! Status" + r.getStatus());
r.getParsingError().ifPresent(e -> {
logger.error("Parsing Exception: ", e);
logger.error("Original body: " + e.getOriginalBody());
});
});
} catch (Exception e) {
logger.error("Error on request! {}", e.getMessage());
}
return response.get();
}
public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
return allFuturesResult.thenApply(v ->
futuresList.stream().
map(CompletableFuture::join).
collect(Collectors.<T>toList())
);
}
}
当任何一个请求花费超过 3 秒时就会出现问题...我想得到那些有时间得到它的结果...
我在我的 7 秒请求之一中故意在我的网站上延迟,我得到以下输出:其中一个有时间但它的结果不在列表中...
2020-12-09T17:05:03,878 [pool-2-thread-2] INFO (MainTest:85) - Sending request: http://127.0.0.1:8095/testing2
2020-12-09T17:05:03,878 [pool-2-thread-1] INFO (MainTest:85) - Sending request: http://localhost:8080/testing
2020-12-09T17:05:03,878 [main] INFO (MainTest:53) - Waiting completables
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at me.testing.MainTest.start(MainTest.java:60)
at me.testing.MainTest.main(MainTest.java:31)
2020-12-09T17:05:06,889 [main] INFO (MainTest:69) - Took: 00:00:03.009
0
如果超时,您应该从已经完成的值中获取值。
可以是这样的:
public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
try {
allFuturesResult.get(timeout, unit);
} catch (Exception e) {
// you may log it
}
return futuresList
.stream()
.filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
.map(CompletableFuture::join) // get the value from the completed future
.collect(Collectors.<T>toList()); // collect as a list
}
这是一个完整的工作示例,我只是将 doReq 替换为睡眠,因为我没有您的网络服务:
public class MainTest {
private Instant start;
public static void main(String[] args) {
MainTest main = new MainTest();
main.start();
}
public void start() {
String req1 = "http://localhost:8080/testing";
String req2 = "http://127.0.0.1:8095/testing2";
ExecutorService exec = Executors.newCachedThreadPool();
start = Instant.now();
CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);
List<CompletableFuture<String>> completables = List.of(comp1, comp2);
System.out.println("Waiting completables");
List<String> r = getAllCompleted(completables, 3, TimeUnit.SECONDS);
Instant end = Instant.now();
System.out.println(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));
System.out.println(r.size());
r.forEach(System.out::println);
exec.shutdown();
}
public String doReq(String request) {
if (request.contains("localhost")) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "response1";
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "response2";
}
public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
try {
allFuturesResult.get(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return futuresList.stream()
.filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
.map(CompletableFuture::join) // get the value from the completed future
.collect(Collectors.<T>toList()); // collect as a list
}
}
我最近才开始使用 CompletableFuture,我遇到了一个问题,我有 N 个请求待办。
每个请求都应发送到 2 个不同的端点,其结果应与 JSON 进行比较。由于我有大量请求待办,而且我不知道每个请求需要多少时间,所以我想限制等待结果的时间,例如 3 秒左右。
所以我写了这段测试代码:
public class MainTest {
private static final Logger logger = LoggerFactory.getLogger(MainTest.class);
private Instant start;
public static void main(String[] args) {
MainTest main = new MainTest();
main.start();
}
public void start(){
String req1 = "http://localhost:8080/testing";
String req2 = "http://127.0.0.1:8095/testing2";
ExecutorService exec = Executors.newCachedThreadPool();
start = Instant.now();
CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);
List<CompletableFuture<String>> completables = List.of(comp1,comp2);
logger.info("Waiting completables");
CompletableFuture<List<String>> a = allOf(completables);
List<String> r = new ArrayList<>();
try {
r = a.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
Instant end = Instant.now();
logger.info(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));
System.out.println(r.size());
r.forEach(System.out::println);
}
exec.shutdown();
}
public String doReq(String request){
AtomicReference<String> response = new AtomicReference<>("default");
try{
logger.info("Sending request: {}", request);
Unirest.get(request).asJson()
.ifSuccess(r -> {
response.set(r.getBody().toString());
})
.ifFailure(r -> {
logger.error("Oh No! Status" + r.getStatus());
r.getParsingError().ifPresent(e -> {
logger.error("Parsing Exception: ", e);
logger.error("Original body: " + e.getOriginalBody());
});
});
} catch (Exception e) {
logger.error("Error on request! {}", e.getMessage());
}
return response.get();
}
public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
return allFuturesResult.thenApply(v ->
futuresList.stream().
map(CompletableFuture::join).
collect(Collectors.<T>toList())
);
}
}
当任何一个请求花费超过 3 秒时就会出现问题...我想得到那些有时间得到它的结果... 我在我的 7 秒请求之一中故意在我的网站上延迟,我得到以下输出:其中一个有时间但它的结果不在列表中...
2020-12-09T17:05:03,878 [pool-2-thread-2] INFO (MainTest:85) - Sending request: http://127.0.0.1:8095/testing2
2020-12-09T17:05:03,878 [pool-2-thread-1] INFO (MainTest:85) - Sending request: http://localhost:8080/testing
2020-12-09T17:05:03,878 [main] INFO (MainTest:53) - Waiting completables
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at me.testing.MainTest.start(MainTest.java:60)
at me.testing.MainTest.main(MainTest.java:31)
2020-12-09T17:05:06,889 [main] INFO (MainTest:69) - Took: 00:00:03.009
0
如果超时,您应该从已经完成的值中获取值。
可以是这样的:
public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
try {
allFuturesResult.get(timeout, unit);
} catch (Exception e) {
// you may log it
}
return futuresList
.stream()
.filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
.map(CompletableFuture::join) // get the value from the completed future
.collect(Collectors.<T>toList()); // collect as a list
}
这是一个完整的工作示例,我只是将 doReq 替换为睡眠,因为我没有您的网络服务:
public class MainTest {
private Instant start;
public static void main(String[] args) {
MainTest main = new MainTest();
main.start();
}
public void start() {
String req1 = "http://localhost:8080/testing";
String req2 = "http://127.0.0.1:8095/testing2";
ExecutorService exec = Executors.newCachedThreadPool();
start = Instant.now();
CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);
List<CompletableFuture<String>> completables = List.of(comp1, comp2);
System.out.println("Waiting completables");
List<String> r = getAllCompleted(completables, 3, TimeUnit.SECONDS);
Instant end = Instant.now();
System.out.println(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));
System.out.println(r.size());
r.forEach(System.out::println);
exec.shutdown();
}
public String doReq(String request) {
if (request.contains("localhost")) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "response1";
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "response2";
}
public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
try {
allFuturesResult.get(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return futuresList.stream()
.filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
.map(CompletableFuture::join) // get the value from the completed future
.collect(Collectors.<T>toList()); // collect as a list
}
}