使用 CompletableFuture 执行单个或多个 Callables 并避免阻塞
Use CompletableFuture to execute single or multiple Callables and avoid blocking
我通过 ThreadPoolExecutor
执行了一些调用。如果线程列表仅包含 1 个可调用项,那么我直接调用 CallableService
的 call
方法。如果列表包含超过 1 个可调用对象,那么我将通过线程池执行器并行执行所有这些线程。
如何使用 Java 8 CompletableFuture 实现此目的?如果future.get()
增强以避免阻塞,那将是一个加号。
private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
List<Future<Boolean>> futureList = null;
CallableService singleService = (CallableService) threadList.get(0);
if (1 == threadList.size()) {
singleService.call();
}
else {
try {
futureList = myThreadPoolExecutor.invokeAll(threadList);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
if (null != futureList) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
Future.isDone() 告诉我们执行者是否完成了任务处理。如果任务完成,它将 return 为真,否则,它将 return 为假。
for (Future<Boolean> future : futureList) {
while(!future.isDone())
{
doSOmethingElse();
Thread.sleep(300);//Optional
}
try {
future.get();
}
catch (Exception e)
{
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
但我们不必担心这一点,因为在确保任务完成后我们到达了调用 get() 的地步。
I execute a few callables through ThreadPoolExecutor. If thread list contains only 1 callable then I directly call call method of my CallableService. If list contains more than 1 callables then I execute all those threads in parallel via thread pool executor.
我猜你已经实现了这部分。 (如果您的作业繁重并且配置了 100 个线程 运行ning,您可能 运行 会遇到内存使用问题。但这是一个不同的问题。)
And if future.get() is enhanced to avoid blocking, that will be a plus.
为此,您可以采用以下方法:
- 创建另一个
ExecutorService
,其工作将只是 运行 Future.get()
电话。
- 将您的
Future.get()
提交到该服务,如下所示。
关闭它并等待终止。
if (null != futureList) {
ExecutorService waitSvc = Executors.newCachedThreadPool();
for (Future<Boolean> future : futureList) {
try {
waitSvc.submit( () -> future.get() );
}
catch (Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this.
}
但是,我觉得你应该重新设计使用这么多线程的整体方法,除非这只是一个学习应用程序。
不需要 CompletableFuture
,因为您使用 ExecutorService
的方式就足够了,不过,代码流的某些方面可以改进。您获取第一个元素,即使不需要,也可以无缘无故地将其转换为 CallableService
,因为您已经可以通过 Callable
接口调用该方法。在另一个分支中,您正在捕获 InterruptedException
并继续,因此调用者永远不会知道并非所有作业都已执行。在直接的代码流中,您不需要检查 null
:
列表
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
Callable<Boolean> singleService = threadList.get(0);
singleService.call();
}
else {
List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
for(Future<Boolean> future : futureList) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
您可以进一步缩短为
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
但这是首选编码风格的问题。但请注意,它引起了我的注意,在单个元素的情况下,您没有执行相同的异常处理。
要使用 CompletableFuture
,我们需要一个适配器方法,因为便捷方法 supplyAsync
需要 Supplier
而不是 Callable
。使用 的修改变体,我们得到
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
CompletableFuture<?> all = CompletableFuture.allOf(
threadList.stream()
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new));
try {
all.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
所以我们没有 invokeAll
负责提交所有作业。我们必须通过循环或流操作手动执行此操作。另一方面,我们通过代表完成状态的 allOf
获得一个单一的未来,如果至少有一个工作失败,则例外。
与等待完成的 invokeAll
不同,allOf
仅 return 未来,因此等待完成的是 all.get()
调用。我们可以在它之前做其他事情,甚至可以使用这个 属性 来始终在调用者线程中执行第一项工作:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
CompletableFuture<?> tail = CompletableFuture.allOf(
threadList.stream().skip(1)
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new)),
head = callAsync(threadList.get(0), Runnable::run);
try {
head.get();
tail.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
这将始终调用当前线程中的第一个可调用对象,因为 Runnable::run
用作 Executor
将立即在调用线程中执行操作。但在其他所有方面都统一对待,尤其是异常处理。当只有一项工作时,allOf
使用空数组调用将什么都不做,return 一个已经完成的未来,这将达到预期的效果。
我通过 ThreadPoolExecutor
执行了一些调用。如果线程列表仅包含 1 个可调用项,那么我直接调用 CallableService
的 call
方法。如果列表包含超过 1 个可调用对象,那么我将通过线程池执行器并行执行所有这些线程。
如何使用 Java 8 CompletableFuture 实现此目的?如果future.get()
增强以避免阻塞,那将是一个加号。
private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
List<Future<Boolean>> futureList = null;
CallableService singleService = (CallableService) threadList.get(0);
if (1 == threadList.size()) {
singleService.call();
}
else {
try {
futureList = myThreadPoolExecutor.invokeAll(threadList);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
if (null != futureList) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
Future.isDone() 告诉我们执行者是否完成了任务处理。如果任务完成,它将 return 为真,否则,它将 return 为假。
for (Future<Boolean> future : futureList) {
while(!future.isDone())
{
doSOmethingElse();
Thread.sleep(300);//Optional
}
try {
future.get();
}
catch (Exception e)
{
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
但我们不必担心这一点,因为在确保任务完成后我们到达了调用 get() 的地步。
I execute a few callables through ThreadPoolExecutor. If thread list contains only 1 callable then I directly call call method of my CallableService. If list contains more than 1 callables then I execute all those threads in parallel via thread pool executor.
我猜你已经实现了这部分。 (如果您的作业繁重并且配置了 100 个线程 运行ning,您可能 运行 会遇到内存使用问题。但这是一个不同的问题。)
And if future.get() is enhanced to avoid blocking, that will be a plus.
为此,您可以采用以下方法:
- 创建另一个
ExecutorService
,其工作将只是 运行Future.get()
电话。 - 将您的
Future.get()
提交到该服务,如下所示。 关闭它并等待终止。
if (null != futureList) { ExecutorService waitSvc = Executors.newCachedThreadPool(); for (Future<Boolean> future : futureList) { try { waitSvc.submit( () -> future.get() ); } catch (Exception e) { //do some calculations here and then throw exception throw new Exception(e.getMessage(), e); } } waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this. }
但是,我觉得你应该重新设计使用这么多线程的整体方法,除非这只是一个学习应用程序。
不需要 CompletableFuture
,因为您使用 ExecutorService
的方式就足够了,不过,代码流的某些方面可以改进。您获取第一个元素,即使不需要,也可以无缘无故地将其转换为 CallableService
,因为您已经可以通过 Callable
接口调用该方法。在另一个分支中,您正在捕获 InterruptedException
并继续,因此调用者永远不会知道并非所有作业都已执行。在直接的代码流中,您不需要检查 null
:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
Callable<Boolean> singleService = threadList.get(0);
singleService.call();
}
else {
List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
for(Future<Boolean> future : futureList) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
您可以进一步缩短为
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
但这是首选编码风格的问题。但请注意,它引起了我的注意,在单个元素的情况下,您没有执行相同的异常处理。
要使用 CompletableFuture
,我们需要一个适配器方法,因为便捷方法 supplyAsync
需要 Supplier
而不是 Callable
。使用
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
CompletableFuture<?> all = CompletableFuture.allOf(
threadList.stream()
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new));
try {
all.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
所以我们没有 invokeAll
负责提交所有作业。我们必须通过循环或流操作手动执行此操作。另一方面,我们通过代表完成状态的 allOf
获得一个单一的未来,如果至少有一个工作失败,则例外。
与等待完成的 invokeAll
不同,allOf
仅 return 未来,因此等待完成的是 all.get()
调用。我们可以在它之前做其他事情,甚至可以使用这个 属性 来始终在调用者线程中执行第一项工作:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
CompletableFuture<?> tail = CompletableFuture.allOf(
threadList.stream().skip(1)
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new)),
head = callAsync(threadList.get(0), Runnable::run);
try {
head.get();
tail.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
这将始终调用当前线程中的第一个可调用对象,因为 Runnable::run
用作 Executor
将立即在调用线程中执行操作。但在其他所有方面都统一对待,尤其是异常处理。当只有一项工作时,allOf
使用空数组调用将什么都不做,return 一个已经完成的未来,这将达到预期的效果。