使用 BlockingQueue 和多线程。所有线程都陷入等待

Working with BlockingQueue and Multithreads. All Threads Stuck in Waiting

我正在创建一个有多个套件部署的系统,每个部署都有一个测试套件队列。由于我希望测试套件在其单独的套件部署中同时 运行,因此我需要向代码中添加并发性。我已经创建了我正在使用的代码的简化版本,但是当我尝试关闭它时并发部分不起作用。

当 Runner.stopEverything() 被调用时,结果是队列被清空,它等待线程完成,但即使测试全部完成,等待也永远不会完成通知所有()。结果是这个过程只是坐在那里永无止境。我在调试模式下查看它,结果是所有 3 个线程都显示等待。

主要:

public static void main(String args[]) throws Exception {
  Runner.queueTestSuites("SD1", Arrays.asList("A", "B", "C"));
  Runner.queueTestSuites("SD2", Arrays.asList("D", "E", "F"));
  Runner.queueTestSuites("SD3", Arrays.asList("G", "H", "I"));

  Thread.sleep(5000);

  System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~");
  Runner.stopEverything();
}

亚军:

public class Runner {
  private static Map<String, TestQueue> runnerQueueMap = new ConcurrentHashMap<>();

  public synchronized static void queueTestSuites(String suiteDeployment, List<String> testSuiteQueueAsJSON) throws Exception {
    TestQueue queue;
    if(runnerQueueMap.containsKey(suiteDeployment)) {
      queue = runnerQueueMap.get(suiteDeployment);
    } else {
      queue = new TestQueue(suiteDeployment);
    }
    for (int i = 0; i < testSuiteQueueAsJSON.size(); i++) {
      String name = testSuiteQueueAsJSON.get(i);
      queue.addToQueue(name);
    }
    runnerQueueMap.put(suiteDeployment,queue);
  }

  public synchronized static void stopEverything() throws InterruptedException {
    for (String s : runnerQueueMap.keySet()) {
      TestQueue q = runnerQueueMap.get(s);
      q.saveAndClearQueue();
    }

    for (String s : runnerQueueMap.keySet()) {
      TestQueue q = runnerQueueMap.get(s);
      q.waitForThread();
    }

    System.out.println("All done at " + new Date());
  }
}

测试队列:

public class TestQueue {

  private Consumer consumer;
  private Thread consumerThread;
  private java.util.concurrent.BlockingQueue<String> queue;
  private String suiteDeployment;

  public TestQueue(String suiteDeployment) {
    this.suiteDeployment = suiteDeployment;
    queue = new ArrayBlockingQueue<>(100);
    startConsumer();
  }

  public void addToQueue(String testSuite) {
    try {
      queue.put(testSuite);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public synchronized void waitForThread() {
    try {
      if (consumer.running.get()) {
        synchronized (consumerThread) {
          System.out.println("Waiting for " + consumerThread.getName());
          consumerThread.wait();
        }
      }
      System.out.println("Thread complete at " + new Date());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void saveAndClearQueue() {
    List<String> suiteNames = new ArrayList<>();
    for (String suite : queue) {
      suiteNames.add(suite);
    }
    queue.clear();
  }

  private void startConsumer() {
    consumer = new Consumer(queue,suiteDeployment);
    consumerThread = new Thread(consumer);
    consumerThread.start();
  }

  private class Consumer implements Runnable{
    private BlockingQueue<String> queue;
    private String suiteDeployment;
    public AtomicBoolean running;

    public Consumer(BlockingQueue<String> queue, String suiteDeployment){
      this.queue = queue;
      this.suiteDeployment = suiteDeployment;
      this.running = new AtomicBoolean(false);
    }

    @Override
    public void run() {
      try{
        while(!Thread.currentThread().isInterrupted()) {
          String testSuite = queue.take();
          this.running.set(true);
          new Test(testSuite, suiteDeployment).run();
          this.running.set(false);
        }
        notifyAll();
      }catch(Exception e) {
        e.printStackTrace();
      }
    }
  }
}

测试:

public class Test {

  String testSuite = "";
  String suiteDeployment = "";

  public Test(String testSuite, String suiteDeployment) {
    this.testSuite = testSuite;
    this.suiteDeployment = suiteDeployment;
  }

  public void run() {
    int time = new Random().nextInt() % 10000;
    time = Math.max(time, 3000);
    System.out.println("Test Started: " + testSuite + " on " + suiteDeployment + " at " + new Date() + " running for " + time + " on thread " + Thread.currentThread().getName());
    try {
      Thread.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Test Completed: " + testSuite + " on " + suiteDeployment + " at " + new Date());
  }
}

在您的消费者的 运行 方法中,您有一个对 queue.take() 的阻塞调用,这意味着它将阻塞直到队列中有一个项目。您 运行 最终超出了队列中的元素,并且您的所有线程都被 queue.take() 调用阻塞,等待更多元素可供处理。

虽然您的调用是在 while 循环中检查线程是否被中断,但您实际上从未中断线程,因此它永远不会进入 while 循环评估并在调用 queue.take() 时被阻塞

因此您的线程在等待输入在您的阻塞队列中可用时保持等待状态

此外,您的 saveAndClear 方法必须锁定正确的对象,即队列本身,如下所示:

public void saveAndClearQueue() {
    List<String> suiteNames = new ArrayList<String>();
    synchronized (queue) {
        for (String suite : queue) {
            suiteNames.add(suite);
        }
        queue.clear();
    }
    System.out.println("Saved(not executed) : "+suiteNames);
}

你的 waitForThread 方法应该像下面这样:

public void waitForThread() {
    synchronized (consumerThread) {
        while (consumer.running.get()) {
            try {
                consumerThread.wait(100);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    if (!consumer.running.get()) {
        consumerThread.interrupt();
    }

    System.out.println("Thread complete at " + new Date());
}