如何扩展 FutureTask 并确保释放对 Callable 的引用?

How can I extend FutureTask and ensure reference to Callable is released?

我有一个自定义的 ExecutorService,其中包含一个 ScheduledExecutorService,如果提交给 ExecutorSerice 的任务花费的时间太长,它可以用来中断任务,我把完整的 class 放在这个 post.

这工作正常,只是有时中断本身会导致问题,所以我将易失性布尔值 cancel 标志添加到新的 CanceableTask class 并制作他们 subclass 这样他们就可以检查并在布尔值被发送到 true 时干净地停止自己。请注意,在提交给执行程序服务 precisley 的每个 class 中,它们都是一个布尔值实例,因此可以在不取消其他任务的情况下取消长时间运行的任务。

然而,FutureTask 作为参数传递给 beforeExecute(Thread t, Runnable r) 并且这不允许访问Callable class 所以我的超时代码无法设置取消标志。

我通过将 newTaskFor 方法覆盖为 return 一个仅提供对 Callable

的引用的 class 来解决这个问题
public class FutureCallable<V> extends FutureTask<V>
{
    private Callable<V> callable;
    public FutureCallable(Callable<V> callable) {
        super(callable);
        this.callable = callable;
    }
    public Callable<V> getCallable() {
        return callable;
    }
}

一切都很好,我是这么认为的。

不幸的是,随着新任务被提交到 ExecutorService 并最终耗尽内存,我的应用程序现在使用越来越多的内存,当我分析应用程序时,我发现有一个线程堆栈本地引用到所有 FutureCallables,即使在任务完成后,由于 FutureCallable 引用了正在运行的 class,它也会占用大量内存。

当我查看 FutureTask 的代码(FutureCallable extends)时,有一条针对私有 Callable 引用的注释

/** The underlying callable; nulled out after running */

那么我该如何改进我的 FutureCallable 以消除它对 Callable 的引用? 或者为什么在任务完成后会维护对 FutureCallable 的引用。

我已经确认,如果我注释掉 newTaskFor 方法,则不会使用过多的内存,但不幸的是,我无法取消 class。

完成Class是:

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final static int WAIT_BEFORE_INTERRUPT = 10000;
    private final static int WAIT_BEFORE_STOP      = 10000;


    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Future of the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        MainWindow.logger.severe("Init:"+workerSize+":Timeout:"+timeout+":"+timeoutUnit);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout starting from now
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t, r), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //AfterExecute will be called after the task has completed, either of its own accord or because it
        //took too long and was interrupted by corresponding timeout task
        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Shutdown TimeoutExecutor");
        timeoutExecutor.shutdown();
    }

    /**
     * Interrupt or possibly stop the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final       Thread thread;
        private             Callable c;

        public TimeoutTask(Thread thread, Runnable c) {
            this.thread = thread;
            if(c instanceof FutureCallable)
            {
                this.c = ((FutureCallable) c).getCallable();
            }
        }

        @Override
        public void run()
        {
            String msg = "";
            if (c != null)
            {
                if (c != null && c instanceof CancelableTask)
                {
                    MainWindow.logger.severe("+++Cancelling " + msg + " task because taking too long");
                    ((CancelableTask) c).setCancelTask(true);
                }
            }
        }
    }
}

    public abstract class CancelableTask  extends ExecutorServiceEnabledAnalyser
    {
        private volatile boolean cancelTask = false;

        public boolean isCancelTask() {
            return cancelTask;
        }

        public void setCancelTask(boolean cancelTask) {
            this.cancelTask = cancelTask;
        }

        CancelableTask(final MainWindow start, boolean isSelectedRecords, boolean isUseRowSelection)
        {
            super(start, isSelectedRecords, isUseRowSelection);
        }

        CancelableTask(final MainWindow start, List<MetadataChangedWrapper> songs)
        {
            super(start, songs );
        }

    }

这个ThreadLocal在哪里?我觉得很奇怪,很难相信你在说什么,它一直 运行 保留对所有任务的引用,即使在完成后也是如此。如果是这种情况,即使没有您的覆盖,它最终也应该 运行 内存不足(尽管任务本身使用了一些内存,可能少于您的可调用内存所占用的内存,但仍不为零)。

无论如何,您可以覆盖 FutureCallable 上的 done 方法,以便在执行后清空包装对象。