ExecutorService:更好的线程流调节
ExecutorService: Better Thread-flow Regulation
我有一个基于 ExecutorService 线程流规则的问题:
我想 .submit() 多个线程执行,我希望一些线程 等待特定的先前线程完成执行。 .
到目前为止,我知道使用 CountDownLatch() 执行此操作的一种方法。以下示例说明了 2 个线程需要在第三个线程开始之前完成:
public class Example{
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(2); //count of 2
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new Runnable() {
public void run() {
method1(); //simulation of a useful method 1
cdl.countDown(); //reduces the count by one
}
});
executor.submit(new Runnable() {
public void run() {
method2(); //simulation of a useful method 2
cdl.countDown();
}
});
cdl.await(); //will wait until both methods are complete
executor.submit(new Runnable() {
public void run() {
result(); //simulation of a method that needs previous threads to execute }
});
}
}
很明显,这种方法对于这样的工作来说并不是最优的,一方面,不能在不依赖于 CDL 本身的 .await() 之后添加额外的线程。
因此,与 CDL 相比,使用 ExecutorService 调节线程流的更好替代方法是什么,可以更好地操纵线程执行。
就等CountDownLatch
里面第3个Runnable
executor.submit(new Runnable() {
public void run() {
cdl.await(); //will wait until both methods are complete
result(); //simulation of a method that needs previous threads to execute }
});
}
您可能需要将 cdl
声明为 final 以在匿名 Runnable
实例中引用它:
final CountDownLatch cdl = new CountDownLatch(2); //count of 2
替代方法
有一个方法可以像这样创建第 3 个任务:
void createThirdTask() {
executor.submit(new Runnable() {
public void run() {
result(); //simulation of a method that needs previous threads to execute }
});
}
}
前两个任务和计数器之间有一个共享锁。
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private int count = 2;
在 method1()
和 method2()
内部递减 CDL 的值,如果它达到零则触发第三个任务。
void method1() {
//your actual code goes here
try {
lock.writeLock().lock();
if(count-- == 0) {
createThirdTask();
}
} finally {
lock.writeLock().unlock()
}
}
ReentrantReadWriteLock 用于防止竞争条件。
我不确定 Java 是否真的有一个好的标准机制供您执行此操作。 Eclipse 的内部代码有很多额外的 类 来处理任务及其依赖性。
不过,我认为您可以 "use" CompletableFuture
这样做:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Runnable r1 = () -> System.out.println("runnable 1");
Runnable r2 = () -> System.out.println("runnable 2");
Runnable r3 = () -> System.out.println("runnable 3");
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(r1, executor);
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(r2, executor);
cf1.runAfterBoth(cf2, r3);
}
}
如果您的情况更复杂,您最好搜索 'Directed Acyclic Graph' 任务库。
我建议将 CompletableFuture 用于此类线程任务。它可能看起来像这样:
public class RunAfterNThreads {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(RunAfterNThreads::runFirstBatchOfThreads)
.thenAcceptAsync((t) -> runSecondBatchOfThreads(null)).get();
}
private static Object runSecondBatchOfThreads(Object something) {
return something;
}
private static <U> U runFirstBatchOfThreads() {
return null;
}
}
如果你想为此使用一些库,我建议探索像 Akka(事件驱动)这样的东西。
我有一个基于 ExecutorService 线程流规则的问题: 我想 .submit() 多个线程执行,我希望一些线程 等待特定的先前线程完成执行。 .
到目前为止,我知道使用 CountDownLatch() 执行此操作的一种方法。以下示例说明了 2 个线程需要在第三个线程开始之前完成:
public class Example{
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(2); //count of 2
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new Runnable() {
public void run() {
method1(); //simulation of a useful method 1
cdl.countDown(); //reduces the count by one
}
});
executor.submit(new Runnable() {
public void run() {
method2(); //simulation of a useful method 2
cdl.countDown();
}
});
cdl.await(); //will wait until both methods are complete
executor.submit(new Runnable() {
public void run() {
result(); //simulation of a method that needs previous threads to execute }
});
}
}
很明显,这种方法对于这样的工作来说并不是最优的,一方面,不能在不依赖于 CDL 本身的 .await() 之后添加额外的线程。
因此,与 CDL 相比,使用 ExecutorService 调节线程流的更好替代方法是什么,可以更好地操纵线程执行。
就等CountDownLatch
里面第3个Runnable
executor.submit(new Runnable() {
public void run() {
cdl.await(); //will wait until both methods are complete
result(); //simulation of a method that needs previous threads to execute }
});
}
您可能需要将 cdl
声明为 final 以在匿名 Runnable
实例中引用它:
final CountDownLatch cdl = new CountDownLatch(2); //count of 2
替代方法
有一个方法可以像这样创建第 3 个任务:
void createThirdTask() {
executor.submit(new Runnable() {
public void run() {
result(); //simulation of a method that needs previous threads to execute }
});
}
}
前两个任务和计数器之间有一个共享锁。
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private int count = 2;
在 method1()
和 method2()
内部递减 CDL 的值,如果它达到零则触发第三个任务。
void method1() {
//your actual code goes here
try {
lock.writeLock().lock();
if(count-- == 0) {
createThirdTask();
}
} finally {
lock.writeLock().unlock()
}
}
ReentrantReadWriteLock 用于防止竞争条件。
我不确定 Java 是否真的有一个好的标准机制供您执行此操作。 Eclipse 的内部代码有很多额外的 类 来处理任务及其依赖性。
不过,我认为您可以 "use" CompletableFuture
这样做:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Runnable r1 = () -> System.out.println("runnable 1");
Runnable r2 = () -> System.out.println("runnable 2");
Runnable r3 = () -> System.out.println("runnable 3");
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(r1, executor);
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(r2, executor);
cf1.runAfterBoth(cf2, r3);
}
}
如果您的情况更复杂,您最好搜索 'Directed Acyclic Graph' 任务库。
我建议将 CompletableFuture 用于此类线程任务。它可能看起来像这样:
public class RunAfterNThreads {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(RunAfterNThreads::runFirstBatchOfThreads)
.thenAcceptAsync((t) -> runSecondBatchOfThreads(null)).get();
}
private static Object runSecondBatchOfThreads(Object something) {
return something;
}
private static <U> U runFirstBatchOfThreads() {
return null;
}
}
如果你想为此使用一些库,我建议探索像 Akka(事件驱动)这样的东西。