如何调整3个线程的控制流程?
How to adjust this control flow of the 3 threads?
// Task 1 -- the main thread
SimilarityResponse w2vResponse = questionClassifier.compute(questionInfo);
// Task 2
String sku = questionInfo.getSku();
String question = questionInfo.getQuestion();
Callable<ResponseList> dssmTask = () -> this.dssmCompute(sku, question);
Future<ResponseList> dssmService = executorService.submit(dssmTask);
ResponseList dssmResponse;
try {
LOGGER.info("start dssm ... {} ", question);
dssmResponse = dssmService.get(Parameters.getParserTimeLimit(), TimeUnit.MILLISECONDS);
LOGGER.info("dssmResponse ... {} ", dssmResponse);
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn("ExecutionException | InterruptedException");
e.printStackTrace();
} catch (TimeoutException te) {
dssmService.cancel(true);
LOGGER.warn("DSSM time out for {} {}", sku, question);
}
// Task 3
Callable<ResponseList> stsTask = () -> this.stsCompute(sku, question);
Future<ResponseList> stsService = executorService.submit(stsTask);
ResponseList stsResponse;
try {
LOGGER.info("start sts ... {} ", question);
stsResponse = stsService.get(Parameters.getParserTimeLimit(), TimeUnit.MILLISECONDS);
LOGGER.info("stsResponse ... {} ", stsResponse);
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn("ExecutionException | InterruptedException");
e.printStackTrace();
} catch (TimeoutException te) {
stsService.cancel(true);
LOGGER.warn("STS time out for {} {}", sku, question);
}
// Last step == do something for above
SimilarityResponse ensemble = new SimilarityResponse();
return ensemble;
在执行最后一步之前,如何确定任务1-3已经完成?目前的代码似乎是先完成Task 1然后直接returns。
您应该使用 CountDownLatch。在您的主线程中创建它的实例并将该实例传递给您的任务(可调用对象)。然后当任务完成时调用 latch.countDown()。在代码的最后一步调用 latch.await() 以等待每个任务完成。它看起来像这样(如果您的可调用对象创建为 lambda):
final CountDownLatch latch = new CountDownLatch(3);
for(int x = 0; x < 3; x++) {
service.submit( () -> {
// do something
latch.countDown();
});
}
// in the end wait for tasks to finish
latch.await();
// Task 1 -- the main thread
SimilarityResponse w2vResponse = questionClassifier.compute(questionInfo);
// Task 2
String sku = questionInfo.getSku();
String question = questionInfo.getQuestion();
Callable<ResponseList> dssmTask = () -> this.dssmCompute(sku, question);
Future<ResponseList> dssmService = executorService.submit(dssmTask);
ResponseList dssmResponse;
try {
LOGGER.info("start dssm ... {} ", question);
dssmResponse = dssmService.get(Parameters.getParserTimeLimit(), TimeUnit.MILLISECONDS);
LOGGER.info("dssmResponse ... {} ", dssmResponse);
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn("ExecutionException | InterruptedException");
e.printStackTrace();
} catch (TimeoutException te) {
dssmService.cancel(true);
LOGGER.warn("DSSM time out for {} {}", sku, question);
}
// Task 3
Callable<ResponseList> stsTask = () -> this.stsCompute(sku, question);
Future<ResponseList> stsService = executorService.submit(stsTask);
ResponseList stsResponse;
try {
LOGGER.info("start sts ... {} ", question);
stsResponse = stsService.get(Parameters.getParserTimeLimit(), TimeUnit.MILLISECONDS);
LOGGER.info("stsResponse ... {} ", stsResponse);
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn("ExecutionException | InterruptedException");
e.printStackTrace();
} catch (TimeoutException te) {
stsService.cancel(true);
LOGGER.warn("STS time out for {} {}", sku, question);
}
// Last step == do something for above
SimilarityResponse ensemble = new SimilarityResponse();
return ensemble;
在执行最后一步之前,如何确定任务1-3已经完成?目前的代码似乎是先完成Task 1然后直接returns。
您应该使用 CountDownLatch。在您的主线程中创建它的实例并将该实例传递给您的任务(可调用对象)。然后当任务完成时调用 latch.countDown()。在代码的最后一步调用 latch.await() 以等待每个任务完成。它看起来像这样(如果您的可调用对象创建为 lambda):
final CountDownLatch latch = new CountDownLatch(3);
for(int x = 0; x < 3; x++) {
service.submit( () -> {
// do something
latch.countDown();
});
}
// in the end wait for tasks to finish
latch.await();