加入两个不同的ExecutorService

Join a two different ExecutorService

我想加入两个在 ExecutorService 中执行的线程。

public class CURD {

  public static ExecutorService executorService = Executors.newCachedThreadPool();
  
 @Autowired
 Logging logging;

  public void Update(List<? extends HBase> save, List<? extends HBase> delete) {
        Thread t = new Thread(() -> {
            System.out.println("Started Main Thread...");
            try {
                Thread.sleep(1500);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End Main Thread...");
        },"Thread-1");

        logging.setPredecessor(t);
        executorService.submit(t);
    }
}

第二个Class: 此 class 线程应等待第一个线程完成。 但它不会等待第一个线程完成。 我不确定这样做是否正确。

有人可以告诉我如何加入在 ExecutorService 中执行的两个线程吗?

import static com.demo.executorService;

public class Logging {
   
   private Thread predecessor;
   public void  setPredecessor(Thread t) {
        this.predecessor = t;
    }

  private void loggingInfo() {
      Thread run = new Thread( () ->{
                try {
                    if (predecessor != null) {
                        System.out.println(Thread.currentThread().getName() + " Started");
                        predecessor.join();
                        System.out.println(Thread.currentThread().getName() + " Finished");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            addTask(run);
   }

   public void addTask(Runnable run) {
        System.out.println("Runnable Thread logAround.....");
        CompletableFuture.runAsync((run), executorService).exceptionally(ex -> {
            System.out.println("exception occurred " + ex);
            return null;
        });
    }
}

如果想要在一组线程之间进行同步,可以使用 Java CyclicBarrier class:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

要做到这一点,首先创建 CyclicBarrier 对象,对应 parties 个数,即:

private final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_PARIES);

正式地从 Java 文档中可以读到各方是:

the number of threads that must invoke {@link #await} before the barrier is tripped

非正式地, 是必须调用循环屏障的线程数,等待,然后它们才能移动前进。

之后,您需要将屏障实例对象引用传递给每个应该等待的线程,并调用wait barrier.await() ), 因此。内容如下:

  public void Update(..., CyclicBarrier barrier) {
        Thread t = new Thread(() -> {
            System.out.println("Started Main Thread...");
            try {
                 Thread.sleep(1500);
                 barrier.await(); // <--- wait on the barrier
            } catch (InterruptedException | BrokenBarrierException e) {
                 e.printStackTrace();
             }
            System.out.println("End Main Thread...");
        },"Thread-1");
        ...
    }

对必须等待的其他线程重复此过程。确保参与方的数量( NUMBER_OF_PARIES)与应在循环屏障上等待的线程数相匹配,否则会发生死锁。

现在您正在使用 cyclic barrier,您可以清理部分代码,例如,您可以删除与 [=21= 的字段 predecessor 相关的所有逻辑] class.

如果你只想让Thread 2等待Thread 1,那么你可以使用CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

首先创建只有 1 计数的 CountDownLatch 对象:

private final CountDownLatch block_thread2 = new CountDownLatch(1);

并将其传递给 Thread 2,因为您希望此线程等待 Thread 1,请调用 block_thread2.await();

      Thread run = new Thread( () ->{
                   try {
                        ....
                        block_thread2.await(); // wait for Thread 2
                   } catch (InterruptedException e) {
                        // deal with it
                   }
     });
            ...

并在 Thread 1 中添加 wait.countDown();:

  public void Update(...) {
        Thread t = new Thread(() -> {
                   System.out.println("Started Main Thread...");
                   try {
                        Thread.sleep(1500);
                        wait.countDown();
                   } catch (InterruptedException e) {
                        // deal with it
            }
            System.out.println("End Main Thread...");
        },"Thread-1");
        ...
    }

因此,以这种方式,Thread 2 将等待 Thread 1,但 Thread 1 永远不会等待 Thread 2