从 ThreadPoolTask​​Executor 获取可调用或将 Runnable 转换为 Callable

get callable from ThreadPoolTaskExecutor or cast Runnable to Callable

我正在使用 ThreadPoolTaskExecutor for executing my tasks which are implemantations of Callable interface. I just want to check in time if task is still in pool (monitoring). How to do that? I know that I can get queue from ThreadPoolExecutor,但如何将 Runnable 转换为 Callable?

基本上我有这个可调用

public interface IFormatter extends Callable<Integer>{
    Long getOrderId();
}

我是这样执行的

ThreadPoolExecutor.submit(new Formatter(order));

最后我想在一些异步方法中循环遍历 ExecutorService 队列并检查带有 orderId 的线程是否仍然存在。

由于您似乎想要监视 ExecutorService,因此考虑覆盖 decorateTask()。然后你可以装饰未来来监控它的状态。

中所述,您可以通过手动创建并通过 execute 排队来控制包装 CallableFutureTask。否则,submit 会将您的 Callable 包装到一个 ExecutorService-specific 对象中并将其放入队列中,从而无法通过标准 API 查询 Callable 的属性。

使用自定义 FutureTask

class MyFutureTask extends FutureTask<Integer> {
    final IFormatter theCallable;

    public MyFutureTask(IFormatter callable) {
        super(callable);
        theCallable=callable;
    }
    Long getOrderId() {
        return theCallable.getOrderId();
    }
}

通过threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));

排队

您可以查询队列中的订单ID:

public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
    for(Object o: e.getQueue().toArray()) {
        if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
            return true;
    }
    return false;
}

这适用于任何 ExecutorService(假设它有一个队列)。如果您仅使用 ThreadPoolExecutor,您可以自定义 FutureTask 实例的创建(从 Java 6 开始),而不是依赖提交者这样做:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, handler);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof IFormatter)
            return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
        return super.newTaskFor(callable);
    }
}

然后,使用 MyThreadPoolExecutor 的实例而不是 ThreadPoolExecutor 的每个提交 IFormatter 实例将自动包装使用 MyFutureTask 而不是标准的 FutureTask.缺点是这仅适用于此特定 ExecutorService,并且通用方法会针对特殊处理生成未经检查的警告。