如何在Java8中不阻塞地执行依赖任务
How to execute dependent tasks in Java 8 without any blocking
不久前我回答了这个问题:Executing Dependent tasks in parallel in Java 但是使用 future.get() 会阻塞当前线程,如果获取的线程太多,线程池可能会用完线程() 被同时调用。如何从 Java 中的期货组合期货?
我想我会自己回答这个问题,可以在 java 中使用 CompletableFutures 而不是 Futures。 CompletableFutures 允许通过 thenCombine 方法进行组合,这类似于 scalas flatMap。现在没有阻塞发生,只需要3个线程就可以达到最快的时间。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public class Barrista
{
// number of threads used in executor
static final int NOTHREADS = 3;
// time of each task
static final int HEATWATER = 1000;
static final int GRINDBEANS = 1000;
static final int FROTHMILK = 1000;
static final int BREWING = 1000;
static final int COMBINE = 1000;
// method to simulate work (pause current thread without throwing checked exception)
public static void pause(long t)
{
try
{
Thread.sleep(t);
}
catch(Exception e)
{
throw new Error(e.toString());
}
}
// task to heat some water
static class HeatWater implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Heating Water");
pause(HEATWATER);
return "hot water";
}
}
// task to grind some beans
static class GrindBeans implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Grinding Beans");
pause(GRINDBEANS);
return "grinded beans";
}
}
// task to froth some milk
static class FrothMilk implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Frothing some milk");
pause(FROTHMILK);
return "some milk";
}
}
// task to brew some coffee
static class Brew implements BiFunction<String,String, String>
{
@Override
public String apply(String groundBeans, String heatedWater)
{
System.out.println("Brewing coffee with " + groundBeans + " and " + heatedWater);
pause(BREWING);
return "brewed coffee";
}
}
// task to combine brewed coffee and milk
static class Combine implements BiFunction<String,String, String>
{
@Override
public String apply(String frothedMilk, String brewedCoffee)
{
System.out.println("Combining " + frothedMilk + " "+ brewedCoffee);
pause(COMBINE);
return "Final Coffee";
}
}
public static void main(String[] args)
{
ExecutorService executor = Executors.newFixedThreadPool(NOTHREADS);
long startTime = System.currentTimeMillis();
try
{
// create all the tasks and let the executor handle the execution order
CompletableFuture<String> frothMilk = CompletableFuture.supplyAsync(new FrothMilk(), executor);
CompletableFuture<String> heatWaterFuture = CompletableFuture.supplyAsync(new HeatWater(), executor);
CompletableFuture<String> grindBeans = CompletableFuture.supplyAsync(new GrindBeans(), executor);
CompletableFuture<String> brew = heatWaterFuture.thenCombine(grindBeans, new Brew());
CompletableFuture<String> coffee = brew.thenCombine(frothMilk, new Combine());
// final coffee
System.out.println("Here is the coffee:" + coffee.get());
// analyzing times:
System.out.println("\n\n");
System.out.println("Actual time: \t\t\t\t" + (System.currentTimeMillis() - startTime)/1000.0);
// compute the quickest possible time:
long path1 = Math.max(GRINDBEANS, HEATWATER)+ BREWING + COMBINE;
long path2 = FROTHMILK + COMBINE;
System.out.println("Quickest time multi-threaded:\t\t" + Math.max(path1, path2)/1000.0);
// compute the longest possible time:
long longestTime = HEATWATER + GRINDBEANS + FROTHMILK + BREWING + COMBINE;
System.out.println("Quickest time single-threaded thread:\t" + longestTime/1000.0);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
executor.shutdown();
}
}
}
Java 8 引入了 CompletableFuture,您不需要特别阻止 get
调用,除非您根据完成阶段触发回调。
A Future that may be explicitly completed (setting its value and
status), and may be used as a CompletionStage, supporting dependent
functions and actions that trigger upon its completion.
阅读更多关于documentation
在 java 8 之前,这个概念在 google groovy 库中可用,请阅读 documentation and spring library too。
Dexecutor 来救场了。
免责声明我是所有者
依赖任务执行,使用dexecutor轻松完成。
顺序
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addDependency(1, 2);
executor.addDependency(2, 3);
executor.addDependency(3, 4);
executor.addDependency(4, 5);
//Execution
executor.execute(ExecutionConfig.TERMINATING);
平行
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addIndependent(1);
executor.addIndependent(2);
executor.addIndependent(3);
executor.addIndependent(4);
//Execution
executor.execute(ExecutionConfig.TERMINATING);
甚至混合
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);
executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);
executor.execute(new ExecutionConfig().immediateRetrying(2));
参考How Do I了解更多详情
不久前我回答了这个问题:Executing Dependent tasks in parallel in Java 但是使用 future.get() 会阻塞当前线程,如果获取的线程太多,线程池可能会用完线程() 被同时调用。如何从 Java 中的期货组合期货?
我想我会自己回答这个问题,可以在 java 中使用 CompletableFutures 而不是 Futures。 CompletableFutures 允许通过 thenCombine 方法进行组合,这类似于 scalas flatMap。现在没有阻塞发生,只需要3个线程就可以达到最快的时间。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public class Barrista
{
// number of threads used in executor
static final int NOTHREADS = 3;
// time of each task
static final int HEATWATER = 1000;
static final int GRINDBEANS = 1000;
static final int FROTHMILK = 1000;
static final int BREWING = 1000;
static final int COMBINE = 1000;
// method to simulate work (pause current thread without throwing checked exception)
public static void pause(long t)
{
try
{
Thread.sleep(t);
}
catch(Exception e)
{
throw new Error(e.toString());
}
}
// task to heat some water
static class HeatWater implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Heating Water");
pause(HEATWATER);
return "hot water";
}
}
// task to grind some beans
static class GrindBeans implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Grinding Beans");
pause(GRINDBEANS);
return "grinded beans";
}
}
// task to froth some milk
static class FrothMilk implements Supplier<String>
{
@Override
public String get()
{
System.out.println("Frothing some milk");
pause(FROTHMILK);
return "some milk";
}
}
// task to brew some coffee
static class Brew implements BiFunction<String,String, String>
{
@Override
public String apply(String groundBeans, String heatedWater)
{
System.out.println("Brewing coffee with " + groundBeans + " and " + heatedWater);
pause(BREWING);
return "brewed coffee";
}
}
// task to combine brewed coffee and milk
static class Combine implements BiFunction<String,String, String>
{
@Override
public String apply(String frothedMilk, String brewedCoffee)
{
System.out.println("Combining " + frothedMilk + " "+ brewedCoffee);
pause(COMBINE);
return "Final Coffee";
}
}
public static void main(String[] args)
{
ExecutorService executor = Executors.newFixedThreadPool(NOTHREADS);
long startTime = System.currentTimeMillis();
try
{
// create all the tasks and let the executor handle the execution order
CompletableFuture<String> frothMilk = CompletableFuture.supplyAsync(new FrothMilk(), executor);
CompletableFuture<String> heatWaterFuture = CompletableFuture.supplyAsync(new HeatWater(), executor);
CompletableFuture<String> grindBeans = CompletableFuture.supplyAsync(new GrindBeans(), executor);
CompletableFuture<String> brew = heatWaterFuture.thenCombine(grindBeans, new Brew());
CompletableFuture<String> coffee = brew.thenCombine(frothMilk, new Combine());
// final coffee
System.out.println("Here is the coffee:" + coffee.get());
// analyzing times:
System.out.println("\n\n");
System.out.println("Actual time: \t\t\t\t" + (System.currentTimeMillis() - startTime)/1000.0);
// compute the quickest possible time:
long path1 = Math.max(GRINDBEANS, HEATWATER)+ BREWING + COMBINE;
long path2 = FROTHMILK + COMBINE;
System.out.println("Quickest time multi-threaded:\t\t" + Math.max(path1, path2)/1000.0);
// compute the longest possible time:
long longestTime = HEATWATER + GRINDBEANS + FROTHMILK + BREWING + COMBINE;
System.out.println("Quickest time single-threaded thread:\t" + longestTime/1000.0);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
executor.shutdown();
}
}
}
Java 8 引入了 CompletableFuture,您不需要特别阻止 get
调用,除非您根据完成阶段触发回调。
A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.
阅读更多关于documentation
在 java 8 之前,这个概念在 google groovy 库中可用,请阅读 documentation and spring library too。
Dexecutor 来救场了。
免责声明我是所有者
依赖任务执行,使用dexecutor轻松完成。
顺序
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addDependency(1, 2);
executor.addDependency(2, 3);
executor.addDependency(3, 4);
executor.addDependency(4, 5);
//Execution
executor.execute(ExecutionConfig.TERMINATING);
平行
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addIndependent(1);
executor.addIndependent(2);
executor.addIndependent(3);
executor.addIndependent(4);
//Execution
executor.execute(ExecutionConfig.TERMINATING);
甚至混合
DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);
executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);
executor.execute(new ExecutionConfig().immediateRetrying(2));
参考How Do I了解更多详情