ExecuterService 停止处理两个线程中的一个

ExecuterService stopped processing one thread out of two

  1. 我有一个包含 40000 条记录的列表,需要在 for 循环中处理。因为我有一个双处理器系统。我创建了一个这样的固定线程池:

    int threads = Runtime.getRuntime().availableProcessors(); ExecutorService service = Executors.newFixedThreadPool(threads);

  2. 并将我的 ArrayList 分成两个子列表。对于这些子列表中的每一个,我都创建了一个 Callable 来执行相同的功能(涉及遍历子列表并进行一些处理)并且 returns 我是一个 Future 对象。

  3. 我使用 executorServiceObject.submit(callable) 提交了这两个 Callable 并将返回的 Future 对象添加到我的 Future 对象列表中

这是我的问题:

我写了一个System.Out.printLn("Processed Item" +item.id) // consider item as the name of reference variable for current iteration

有一段时间一切都很好,我可以看到两个线程同时工作。但一段时间后,其中一个线程停止处理。只有一个线程是 运行。 (我知道这一点是因为我可以在控制台上看到不再打印给线程 2 的 ID)。

有谁知道这是怎么发生的?我的意思是为什么 ExecutorService 停止了 运行 第二个线程。

提前感谢您的帮助。

添加我之前应该做的示例代码:

public List<Output> processInputs(List<Input> inputs)
        throws InterruptedException, ExecutionException {

    int threads = Runtime.getRuntime().availableProcessors();
    ExecutorService service = Executors.newFixedThreadPool(threads);

    List<Future<Output>> futures = new ArrayList<Future<Output>>();
    for (final Input input : inputs) {
        Callable<Output> callable = new Callable<Output>() {
            public Output call() throws Exception {
                Output output = new Output();
                // process your input here and compute the output
                return output;
            }
        };
        futures.add(service.submit(callable));
    }

    service.shutdown();

    List<Output> outputs = new ArrayList<Output>();
    for (Future<Output> future : futures) {
        outputs.add(future.get());
    }
    return outputs;

Everything was fine for some time and i could see two threads working simultaneously. But after some time, one of the threads have stopped processing. Only one thread is running. (I know this because i can see on the console that the id's given to thread 2 are not being printed anymore).

我怀疑你的处理线程抛出了异常。 Future.get() 方法可以抛出 ExecutionException "if the computation threw an exception".

// the following might throw an exception if the background job threw
outputs.add(future.get());

如果您的 "process your input" 代码抛出 NPE、IOException 等异常,则 Callable 抛出该异常并存储在 Future 中,因此它可以由 get() 方法抛出,但包裹在 ExecutionException 中。这很有用,因此正在等待的线程可以获取并处理(记录等)后台线程抛出的异常。

我不会让您的 processInputs(...) 方法在可能丢失的地方向调用者抛出异常,我会在您的 while 循环中执行类似以下操作:

try {
   outputs.add(future.get());
} catch (InterruptedException ie) {
   // always a good pattern if the thread that is waiting was interrupted
   Thread.currentThread().interrupt();
   return;
} catch (ExecutionException ee) {
   // somehow log the error
   logger.error("Computation failed to process", ee);
   // now continue and get the next future in the list
}

如果您没有捕获并正确处理 ExecutionException 那么处理异常也会终止调用 processInputs(...).

的线程