ExecutorService:更好的线程流调节

ExecutorService: Better Thread-flow Regulation

我有一个基于 ExecutorService 线程流规则的问题: 我想 .submit() 多个线程执行,我希望一些线程 等待特定的先前线程完成执行。 .

到目前为止,我知道使用 CountDownLatch() 执行此操作的一种方法。以下示例说明了 2 个线程需要在第三个线程开始之前完成:

public class Example{

public static void main(String[] args) throws InterruptedException {

    CountDownLatch cdl = new CountDownLatch(2); //count of 2
    ExecutorService executor = Executors.newCachedThreadPool();
    executor.submit(new Runnable() {
        public void run() {
             method1(); //simulation of a useful method 1
            cdl.countDown(); //reduces the count by one
        }
    });
    executor.submit(new Runnable() {
        public void run() {
             method2(); //simulation of a useful method 2
            cdl.countDown();
        }
    });
    cdl.await(); //will wait until both methods are complete
    executor.submit(new Runnable() { 
        public void run() {
        result(); //simulation of a method that needs previous threads to execute }
        });
    }
}

很明显,这种方法对于这样的工作来说并不是最优的,一方面,不能在不依赖于 CDL 本身的 .await() 之后添加额外的线程。

因此,与 CDL 相比,使用 ExecutorService 调节线程流的更好替代方法是什么,可以更好地操纵线程执行。

就等CountDownLatch里面第3个Runnable

executor.submit(new Runnable() { 
    public void run() {
       cdl.await(); //will wait until both methods are complete
       result(); //simulation of a method that needs previous threads to execute }
    });
}

您可能需要将 cdl 声明为 final 以在匿名 Runnable 实例中引用它:

final CountDownLatch cdl = new CountDownLatch(2); //count of 2 

替代方法

有一个方法可以像这样创建第 3 个任务:

void createThirdTask() {
    executor.submit(new Runnable() { 
        public void run() {
           result(); //simulation of a method that needs previous threads to execute }
        });
    }
}

前两个任务和计数器之间有一个共享锁。

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private int count = 2;

method1()method2() 内部递减 CDL 的值,如果它达到零则触发第三个任务。

void method1() {
        //your actual code goes here
    try {
        lock.writeLock().lock();
        if(count-- == 0) {
            createThirdTask();
        }
    } finally {
        lock.writeLock().unlock()
    }
}

ReentrantReadWriteLock 用于防止竞争条件。

我不确定 Java 是否真的有一个好的标准机制供您执行此操作。 Eclipse 的内部代码有很多额外的 类 来处理任务及其依赖性。

不过,我认为您可以 "use" CompletableFuture 这样做:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Runnable r1 = () -> System.out.println("runnable 1");
        Runnable r2 = () -> System.out.println("runnable 2");
        Runnable r3 = () -> System.out.println("runnable 3");


        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(r1, executor);
        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(r2, executor);

        cf1.runAfterBoth(cf2, r3);
  }
}

如果您的情况更复杂,您最好搜索 'Directed Acyclic Graph' 任务库。

我建议将 CompletableFuture 用于此类线程任务。它可能看起来像这样:

public class RunAfterNThreads {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(RunAfterNThreads::runFirstBatchOfThreads)
            .thenAcceptAsync((t) -> runSecondBatchOfThreads(null)).get();
}

private static Object runSecondBatchOfThreads(Object something) {
    return something;
}

private static <U> U runFirstBatchOfThreads() {
    return null;
}

}

如果你想为此使用一些库,我建议探索像 Akka(事件驱动)这样的东西。