从 ThreadPoolTaskExecutor 获取可调用或将 Runnable 转换为 Callable
get callable from ThreadPoolTaskExecutor or cast Runnable to Callable
我正在使用 ThreadPoolTaskExecutor for executing my tasks which are implemantations of Callable interface. I just want to check in time if task is still in pool (monitoring). How to do that? I know that I can get queue from ThreadPoolExecutor,但如何将 Runnable 转换为 Callable?
基本上我有这个可调用
public interface IFormatter extends Callable<Integer>{
Long getOrderId();
}
我是这样执行的
ThreadPoolExecutor.submit(new Formatter(order));
最后我想在一些异步方法中循环遍历 ExecutorService 队列并检查带有 orderId 的线程是否仍然存在。
由于您似乎想要监视 ExecutorService,因此考虑覆盖 decorateTask()
。然后你可以装饰未来来监控它的状态。
如 中所述,您可以通过手动创建并通过 execute
排队来控制包装 Callable
的 FutureTask
。否则,submit
会将您的 Callable
包装到一个 ExecutorService
-specific 对象中并将其放入队列中,从而无法通过标准 API 查询 Callable
的属性。
使用自定义 FutureTask
class MyFutureTask extends FutureTask<Integer> {
final IFormatter theCallable;
public MyFutureTask(IFormatter callable) {
super(callable);
theCallable=callable;
}
Long getOrderId() {
return theCallable.getOrderId();
}
}
通过threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));
、
排队
您可以查询队列中的订单ID:
public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
for(Object o: e.getQueue().toArray()) {
if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
return true;
}
return false;
}
这适用于任何 ExecutorService
(假设它有一个队列)。如果您仅使用 ThreadPoolExecutor
,您可以自定义 FutureTask
实例的创建(从 Java 6 开始),而不是依赖提交者这样做:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, handler);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if(callable instanceof IFormatter)
return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
return super.newTaskFor(callable);
}
}
然后,使用 MyThreadPoolExecutor
的实例而不是 ThreadPoolExecutor
的每个提交 IFormatter
实例将自动包装使用 MyFutureTask
而不是标准的 FutureTask
.缺点是这仅适用于此特定 ExecutorService
,并且通用方法会针对特殊处理生成未经检查的警告。
我正在使用 ThreadPoolTaskExecutor for executing my tasks which are implemantations of Callable interface. I just want to check in time if task is still in pool (monitoring). How to do that? I know that I can get queue from ThreadPoolExecutor,但如何将 Runnable 转换为 Callable?
基本上我有这个可调用
public interface IFormatter extends Callable<Integer>{
Long getOrderId();
}
我是这样执行的
ThreadPoolExecutor.submit(new Formatter(order));
最后我想在一些异步方法中循环遍历 ExecutorService 队列并检查带有 orderId 的线程是否仍然存在。
由于您似乎想要监视 ExecutorService,因此考虑覆盖 decorateTask()
。然后你可以装饰未来来监控它的状态。
如 execute
排队来控制包装 Callable
的 FutureTask
。否则,submit
会将您的 Callable
包装到一个 ExecutorService
-specific 对象中并将其放入队列中,从而无法通过标准 API 查询 Callable
的属性。
使用自定义 FutureTask
class MyFutureTask extends FutureTask<Integer> {
final IFormatter theCallable;
public MyFutureTask(IFormatter callable) {
super(callable);
theCallable=callable;
}
Long getOrderId() {
return theCallable.getOrderId();
}
}
通过threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));
、
您可以查询队列中的订单ID:
public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
for(Object o: e.getQueue().toArray()) {
if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
return true;
}
return false;
}
这适用于任何 ExecutorService
(假设它有一个队列)。如果您仅使用 ThreadPoolExecutor
,您可以自定义 FutureTask
实例的创建(从 Java 6 开始),而不是依赖提交者这样做:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, handler);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if(callable instanceof IFormatter)
return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
return super.newTaskFor(callable);
}
}
然后,使用 MyThreadPoolExecutor
的实例而不是 ThreadPoolExecutor
的每个提交 IFormatter
实例将自动包装使用 MyFutureTask
而不是标准的 FutureTask
.缺点是这仅适用于此特定 ExecutorService
,并且通用方法会针对特殊处理生成未经检查的警告。