创建一个不退出的线程

Creating a Thread that does not exit

我想知道创建不终止的 Java 线程的最佳方法是什么。 目前,我基本上有一个 "Runner" 基本上看起来像:

ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < numThreads; ++i) {
    pool.submit(new Task());
}
pool.shutdown();

任务看起来像这样

public class Task {
    ...
    public void run() {
        while(true) { }
    }
}

我的方法有两个问题:

  1. 我是否应该创建一个仅在完成工作后 returns 的任务,并继续生成执行最少工作量的线程?我担心开销,但不确定如何衡量。

  2. 如果我有一个无限循环的线程,当我强制退出可执行文件时,这些线程会被关闭并清理吗?经过一些测试,当包含 ExecutorService 的代码被强制关闭时,似乎没有抛出 InterruptException。

编辑: 详细来说,任务看起来像

public void run() {
    while(true) {
        // Let queue be a synchronized, global queue
        if (queue has an element) {
            // Pop from queue and do a very minimal amount of work on it
            // Involves a small amount of network IO (maybe 10-100 ms)
        } else {
            sleep(2000);
        }
    }
}

我同意@D Levant,阻塞队列是这里使用的关键。使用阻塞队列,您无需处理队列空或队列满的情况。

在您的任务中 class、

while(true) {
        // Let queue be a synchronized, global queue
        if (queue has an element) {
            // Pop from queue and do a very minimal amount of work on it
            // Involves a small amount of network IO (maybe 10-100 ms)
        } else {
            sleep(2000);
        }
    }

这确实不是一个好方法,它效率低下,因为你的 while 循环不断轮询,即使你已经让线程睡眠(),但它仍然是每次线程唤醒时不必要的上下文切换的开销然后睡觉。

在我看来,您使用 Executors 的方法看起来很适合您的情况。线程的创建显然是一个代价高昂的过程,Executors 为我们提供了为不同的任务重复使用同一个线程的灵活性。 你可以只通过 execute(Runnable)submit(Runnable/Callable) 传递你的任务,然后其余的将由 Executors 内部处理。执行器在内部仅使用阻塞队列概念。

您甚至可以使用 ThreadPoolExecutor class 并在其构造函数中传递所需的参数来创建自己的线程池,在这里您可以传递自己的阻塞队列来保存任务。其余线程管理将根据构造函数中的配置传递来处理,所以如果你对配置参数真的有信心那么你可以去做。

最后一点,如果您不想使用 Java 的内置执行器框架,那么您可以通过使用 BlockingQueue 来保留任务并启动一个线程来设计您的解决方案从这个阻塞队列中取出任务来执行,下面是高级实现:

class TaskRunner {
   private int noOfThreads;  //The no of threads which you want to run always
   private boolean started;
   private int taskQueueSize; //No. of tasks that can be in queue at a time, when try to add more tasks, then you have to wait.
   private BlockingQueue<Runnable> taskQueue;
   private List<Worker> workerThreads;

   public TaskRunner(int noOfThreads, int taskQueueSize) {
      this.noOfThreads = noOfThreads;
      this.taskQueueSize = taskQueueSize;
   }

  //You can pass any type of task(provided they are implementing Runnable)
   public void submitTask(Runnable task) {
      if(!started) {
         init();
      }
      try {
         taskQueue.put(task);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public void shutdown() {
      for(Worker worker : workerThreads){
         worker.stopped = true;
      }
   }

   private void init() {
      this.taskQueue = new LinkedBlockingDeque<>(taskQueueSize);
      this.workerThreads = new ArrayList<>(noOfThreads);
      for(int i=0; i< noOfThreads; i++) {
         Worker worker = new Worker();
         workerThreads.add(worker);
         worker.start();
      }
   }

   private class Worker extends Thread {
      private volatile boolean stopped;
      public void run() {
         if(!stopped) {
            try {
               taskQueue.take().run();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }
   }
}

class Task1 implements Runnable {
   @Override
   public void run() {
      //Your implementation for the task of type 1
   }
}

class Task2 implements Runnable {
   @Override
   public void run() {
      //Your implementation for the task of type 2
   }
}

class Main {

   public static void main(String[] args) {
      TaskRunner runner = new TaskRunner(3,5);
      runner.submitTask(new Task1());
      runner.submitTask(new Task2());
      runner.shutdown();
   }
}