什么终止了我的 Java ExecutorService

What is terminating my Java ExecutorService

我最初看到这个问题是 ThreadPoolExecutor 的一个更复杂的子class,但我已经简化了,所以现在只包含一些额外的调试,但仍然遇到同样的问题。

import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    /**
     * Uses the default CallerRunsPolicy when queue is full
     *  @param workerSize
     * @param threadFactory
     * @param queue
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
    }

    /**
     * Allow caller to specify the RejectedExecutionPolicy
     *  @param workerSize
     * @param threadFactory
     * @param queue
     * @param reh
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    /**
     * Check not been paused
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        SongKong.checkIn();
    }

    /**
     * After execution
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        super.afterExecute(r, t);

        if (t == null && r instanceof Future<?>)
        {
            try
            {
              Object result = ((Future<?>) r).get();
            }
            catch (CancellationException ce)
            {
                t = ce;
            }
            catch (ExecutionException ee)
            {
                t = ee.getCause();
            }
            catch (InterruptedException ie)
            {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
        {
            MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
        }
    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
    }

    @Override
    public void shutdown()
    {
        MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
        super.shutdown();
    }
}

此 ExecutorService 正被以下 class 使用,允许实例异步提交任务,在所有提交的任务完成之前不应关闭 ExecutorService。

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
 *  Sets a timeout of each task submitted and cancel them if take longer than the timeout
 *
 *  The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
 */
public class MainAnalyserService extends AnalyserService
{
    //For monitoring/controlling when finished
    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    //If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
    private static final int TIMEOUT_PER_TASK = 30;

    private static MainAnalyserService mas;

    public static MainAnalyserService getInstanceOf()
    {
        return mas;
    }

    public static MainAnalyserService create(String threadGroup)
    {
        mas = new MainAnalyserService(threadGroup);
        return mas;
    }

    public MainAnalyserService(String threadGroup)
    {
        super(threadGroup);
        initExecutorService();
    }

    /**
    Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
    scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
    other thread to do something.
     */
    @Override
    protected void initExecutorService()
    {
        int workerSize = GeneralPreferences.getInstance().getWorkers();
        if(workerSize==0)
        {
            workerSize = Runtime.getRuntime().availableProcessors();
        }
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }
        MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
        MainWindow.logger.severe("Workers Configuration:"+ workerSize);

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES,
                new EnsureIncreaseCountIfRunOnCallingThread());
    }

    public AtomicInteger getPendingItems()
    {
        return pendingItems;
    }

    /**
     * If queue is full this gets called and we log that we run task on local calling thread.
     */
    class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
    {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public EnsureIncreaseCountIfRunOnCallingThread() { }

        /**
         * Executes task on calling thread, ensuring we increment count
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown())
            {
                try
                {
                    MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
                    r.run();
                    MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" +  pendingItems.get());
                }
                catch(Exception ex)
                {
                    MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
                }
            }
        }
    }

    /**
     * Increase count and then Submit to ExecutorService
     *
     * @param callingTask
     * @param task
     */
    public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
    {
        //Ensure we increment before calling submit in case rejectionExecution comes into play
        int remainingItems = pendingItems.incrementAndGet();
        executorService.submit(task);
        MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
    }

    public ExecutorService getExecutorService()
    {
        return executorService;
    }

    /**
     * Must be called by Callable when it has finished work (or if error)
     *
     * @param task
     */
    public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
        if (remainingItems == 0)
        {
            MainWindow.userInfoLogger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

    /**
     * Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
     *
     * @throws InterruptedException
     */
    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

调用Class有

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

对于一位客户,即使仍有任务未完成,terminated() 方法仍被调用,并且 executorservice 仅 运行ning 了 8 分钟,并且没有任务计时出去。我在本地也看到了这个问题

调试显示

用户日志

05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

调试日志

   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

所以我们可以看到还有 68 个任务要完成,MainAnalyser 还没有关闭闩锁,但是线程池执行器已经终止

我重写了 shutdown() 以查看是否调用了它,但没有调用,

terminate() 正在被 运行Worker() 调用,运行Worker() 应该继续循环直到队列为空,但事实并非如此,但似乎有什么东西导致它离开循环和 processWorkerExit() 在进行更多检查后最终终止整个执行器(不仅仅是工作线程)

10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

因为 ThreadPoolExecutor 是标准的一部分 Java 我不能(轻易地)设置断点来尝试找出它在做什么,这是 ThreadPoolExecutor 代码(标准 Jave 不是我的代码)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们在执行器中试验了队列大小,默认情况下它是 100,因为我不希望它变得太大,因为队列任务会使用更多内存,我宁愿调用任务 运行s 自身,如果队列繁忙。但是为了解决这个问题(并消除了因为队列已满而调用 CallerRunPolicy 的需要),我将队列大小增加到 1000,这导致错误发生得更快,然后完全消除了限制并继续更快地失败

 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

我正在寻找 ThreadExecutorPool 的替代品并遇到了 ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

我注意到的一件事是,与在外部提交表单相比,ForkJoinPool 在提交给 ForkJoinPool 的任务中使用不同的方法来提交任务。我不明白为什么会这样,但想知道是否因为我正在从 Executor 运行 的任务中提交任务,这是否会以某种方式引起问题?

我现在已经设法创建自己的 ThreadPoolExecutor 版本,只需 copying/pasting 将代码写入新的 Class,重命名,并且还必须创建一个需要我的 class 的 RejectedExcecutionhandler 版本而不是 ThreadPoolExecutor 并得到了这个 运行ning.

开始添加一些调试以查看我是否可以破译发生了什么,有什么想法吗?

在调用 processWorkerExit 之前我添加了

 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
                    +":"+getActiveCount()
                    +":"+w.completedTasks
                    +":"+ completedAbruptly);

失败了

-----------------------3686:0:593:false

很长一段时间我都认为问题一定出在我的代码上,然后我开始认为问题出在 ThreadPoolExecutor,但是在我自己的 runWorker() 版本中添加调试显示问题是确实是我自己的代码。

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

从这里我可以看出,虽然工作队列通常变得更长并且匹配

的值
MainThreadAnalyzer.pendingItems -noOfWorkerThreads

在一个特定的点上,这两个值出现了分歧,这就是 SongLoader 进程(我没有真正考虑到这一点)完成的时候。因此 MainThreadAnalyzer 继续提交工作,增加了 pendingItems 的值,但是 Executor 的工作队列大小越来越小。

这导致我们意识到 Executor 很早就有 shutdown(),但我们没有意识到这一点,因为只有在 songloader 关闭后才检查 latch。

它关闭的原因是因为 MainAnalyzerThread 早些时候完成工作比 SongLoader 提交它更快,所以 pendingItems 的值被暂时设置为零,允许闩锁关闭。

解决方法如下

添加一个布尔标志以指示 songLoader 何时完成,并且仅在设置此标志后才允许关闭闩锁。

private boolean songLoaderCompleted = false;
public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

        if (remainingItems == 0 && songLoaderCompleted)
        {
            MainWindow.logger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

然后在 SongLoader 完成后在主线程中设置此标志

 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);

你只是在误用 ​​ExecutorService

你在做什么(即使在你的 "solution" 中)是这样的

  • 提交任务
  • 等他们完成
  • 关机
  • 再次等待关机发生(为什么会这样?)

你应该做的是:

  • 提交任务
  • 关闭执行器以不允许任何新任务
  • 等待终止 - 这将阻塞直到所有任务完成或达到超时

您应该检查 awaitTermination 的 return 状态,因为

  • 如果为真 - 所有任务都在给定超时前完成
  • 如果为 false - 尚未完成所有任务 - 在这种情况下您可能不应该启动第二个池。

还有两个选项如何使用线程执行器。您可以生成工作线程并让他们决定他们应该做什么 - 就像您通过在工作线程中循环处理新任务所做的那样

或者(我更喜欢),将您的工作应该做的任何事情包装到单独的任务中(很可能是您在循环体中拥有的任务)并作为单独的任务提交到池中。 ExecutorService 会为您安排时间。