Java ExecutorService - Task/Callable 不是 cancelling/interrupting
Java ExecutorService - Task/Callable not cancelling/interrupting
我正在使用 Java ExecutorService (ThreadPool) 执行任务并更新 UI,而特定的 activity 在前台(可见)。
问题:
我想要的是当用户切换到另一个 activity 我想要 stop/cancel 所有任务(无论是排队还是 运行)。为此,我必须在通过调用 isDone() 检查 Future 对象状态后,对 ExecutorService 提交方法返回的 Future 对象使用 ExecutorService shutdown/shutdownNow 方法或取消(true)。这会将相应的中断线程标志设置为 TRUE,我必须在我的可调用实现中检查 (Thread.currentThread.isInterrupted()) 以确定是否中断退出 task/thread。问题是我在这两种情况下是否调用 ExecutorService 关闭方法或 Future cancel(true) 方法很少有 10 次中有 1 次将线程中断标志设置为 TRUE,这最终导致内存泄漏等。
代码:
ThreadPool Singleton实现(cancelAll-取消任务&shutdownExecutor-关闭ExecutorService):
private static class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private static ThreadPoolManager instance;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
static {
instance = new ThreadPoolManager();
}
public static void submitItemTest(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void submitTestAll(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
cancelAll();
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
instance.blockingQueue.clear();
for (Future future : instance.queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
instance.queuedFutures.clear();
}
public static void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
instance.executorService.shutdownNow();
}
}
可调用实现(正常迭代和检查中断的 if 子句):
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
Activity/Fragment onStop 实现(用于调用取消任务和关闭):
@Override
public void onStop() {
MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
ThreadPoolManager.cancelAll();
ThreadPoolManager.shutdownExecutor();
super.onStop();
}
更新:
所做的更改:
不再使用 Runnable 而不是 callable。
现在不为 ExecutorService 使用单例。
private class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService =getNewExecutorService();
}
private ExecutorService getNewExecutorService(){
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
private void submitItemTest(Runnable runnable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(executorService.isShutdown()){
executorService=getNewExecutorService();
}
Future future = executorService.submit(runnable);
queuedFutures.add(future);
}
private void submitTestAll(Runnable runnable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(executorService.isShutdown()){
executorService=getNewExecutorService();
}
cancelAll();
Future future = executorService.submit(runnable);
queuedFutures.add(future);
}
private void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
blockingQueue.clear();
for (Future future : queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
queuedFutures.clear();
}
private void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
executorService.shutdownNow();
blockingQueue.clear();
queuedFutures.clear();
}
}
找到了罪魁祸首,但还没有找到解决方案。以下 2 是运行的 Runnables 1 的实现(isInterrupted returns true 或出现 InterupptedException 而不是任务结束)但不是其他的。
Working Runnable(我用它来测试):
new Runnable() {
@Override
public void run() {
int i=0;
while(!Thread.currentThread().isInterrupted()){
try {
System.out.println(i);
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
return;
}
i++;
}
}
}
不工作(我想使用的实际代码):
new Runnable(){
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
break;
}
}
}
};
一种可能的解决方案是在 runnable 中使用变量(布尔值)作为中断标志,我将其视为最后的手段,但很乐意了解错误。
根据 ExecutorService
文档,关闭正在执行的任务是在尽力而为的基础上完成的。
因此,当您调用 ExecutorService.shutdownNow()
时,实现将 尝试 关闭所有当前正在执行的任务。每个任务将保持 运行 直到它到达它检测到它被中断的点。
为了确保您的线程在早期阶段达到该点,最好在您的循环中添加一个线程是否被中断的检查,如下所示:
Thread.currentThread().isInterrupted();
通过在每次迭代中进行此调用,您的线程将检测到与实际中断间隔较短的中断。
因此您修改后的 Callable
代码将如下所示:
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if(Thread.currentThread().isInterrupted()) {
return null;
}
if(someCondition) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
顺便说一下,如果您不打算 return 来自 call()
方法的任何值,那么使用 Callable
是没有意义的。如果您在任务中需要参数化类型,只需创建一个参数化 Runnable
,如下所示:
public class ParameterizedRunnable<T> implements Runnable {
private final T t;
public ParameterizedRunnable(T t) {
this.t = t;
}
public void run() {
//do some work here
}
}
解决方案(出路):
所以最后我继续使用自定义内部标志(布尔值)作为线程中断标志,MyRunnable 将在每次迭代时检查它(带有自定义标志的 runnable 的自定义实现,以便有一个与每个 runnable 相关联的标志)。当需要取消 ExecutorService(ThreadPool) 下的线程时,我遍历所有 Future 对象并将其关联到 MyRunnable,然后将其中断标志(自定义标志)设置为 true,这比 interrupts/close thread.
线程池管理器:
private class ThreadPoolManager {
private ExecutorService executorService;
private final Map<Future,MyRunnable> queuedFutures;
private final BlockingQueue<Runnable> blockingQueue;
private ThreadPoolManager() {
MyLogger.log(DEBUG, "Threadpool-created(constructor)");
queuedFutures = new HashMap<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = getNewExecutorService();
}
private ExecutorService getNewExecutorService() {
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
private void submitItemTest(MyRunnable runnable) {
MyLogger.log(DEBUG, "Threadpool-submitted item test");
if (executorService.isShutdown()) {
executorService = getNewExecutorService();
}
Future future = executorService.submit(runnable);
queuedFutures.put(future,runnable);
}
private void submitTestAll(MyRunnable runnable) {
MyLogger.log(DEBUG, "Threadpool-submitted test all");
if (executorService.isShutdown()) {
executorService = getNewExecutorService();
}
cancelAll();
Future future = executorService.submit(runnable);
queuedFutures.put(future,runnable);
}
private void cancelAll() {
MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks");
blockingQueue.clear();
for (Future future : queuedFutures.keySet()) {
if (!future.isDone()) {
queuedFutures.get(future).continueRunning=false;
MyLogger.log(DEBUG, "Cancelled");
}
}
queuedFutures.clear();
}
private void shutdownExecutor() {
cancelAll();
MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool");
executorService.shutdown();
}
}
MyRunnable(实现Runable的抽象class):
private abstract class MyRunnable implements Runnable {
boolean continueRunning=true;
}
MyRunnable(抽象classMyRunnable的实例):
new MyRunnable() {
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (continueRunning) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)");
break;
}
}
System.out.println("ThreadPool: Test complete");
}
};
现在,调用 threadPoolManager.shutdownExecutor() shutdown/interrupts 当前所有线程 运行。
我正在使用 Java ExecutorService (ThreadPool) 执行任务并更新 UI,而特定的 activity 在前台(可见)。
问题: 我想要的是当用户切换到另一个 activity 我想要 stop/cancel 所有任务(无论是排队还是 运行)。为此,我必须在通过调用 isDone() 检查 Future 对象状态后,对 ExecutorService 提交方法返回的 Future 对象使用 ExecutorService shutdown/shutdownNow 方法或取消(true)。这会将相应的中断线程标志设置为 TRUE,我必须在我的可调用实现中检查 (Thread.currentThread.isInterrupted()) 以确定是否中断退出 task/thread。问题是我在这两种情况下是否调用 ExecutorService 关闭方法或 Future cancel(true) 方法很少有 10 次中有 1 次将线程中断标志设置为 TRUE,这最终导致内存泄漏等。
代码:
ThreadPool Singleton实现(cancelAll-取消任务&shutdownExecutor-关闭ExecutorService):
private static class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private static ThreadPoolManager instance;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
static {
instance = new ThreadPoolManager();
}
public static void submitItemTest(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void submitTestAll(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
cancelAll();
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
instance.blockingQueue.clear();
for (Future future : instance.queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
instance.queuedFutures.clear();
}
public static void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
instance.executorService.shutdownNow();
}
}
可调用实现(正常迭代和检查中断的 if 子句):
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
Activity/Fragment onStop 实现(用于调用取消任务和关闭):
@Override
public void onStop() {
MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
ThreadPoolManager.cancelAll();
ThreadPoolManager.shutdownExecutor();
super.onStop();
}
更新:
所做的更改:
不再使用 Runnable 而不是 callable。
现在不为 ExecutorService 使用单例。
private class ThreadPoolManager { private ExecutorService executorService; private List<Future> queuedFutures; private BlockingQueue<Runnable> blockingQueue; private ThreadPoolManager() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); queuedFutures = new ArrayList<>(); blockingQueue = new LinkedBlockingDeque<>(); executorService =getNewExecutorService(); } private ExecutorService getNewExecutorService(){ return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); } private void submitItemTest(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } Future future = executorService.submit(runnable); queuedFutures.add(future); } private void submitTestAll(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } cancelAll(); Future future = executorService.submit(runnable); queuedFutures.add(future); } private void cancelAll() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); blockingQueue.clear(); for (Future future : queuedFutures) { if (!future.isDone()) { boolean cancelled = future.cancel(true); MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); } } queuedFutures.clear(); } private void shutdownExecutor(){ MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); executorService.shutdownNow(); blockingQueue.clear(); queuedFutures.clear(); } }
找到了罪魁祸首,但还没有找到解决方案。以下 2 是运行的 Runnables 1 的实现(isInterrupted returns true 或出现 InterupptedException 而不是任务结束)但不是其他的。
Working Runnable(我用它来测试):
new Runnable() {
@Override
public void run() {
int i=0;
while(!Thread.currentThread().isInterrupted()){
try {
System.out.println(i);
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
return;
}
i++;
}
}
}
不工作(我想使用的实际代码):
new Runnable(){
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
break;
}
}
}
};
一种可能的解决方案是在 runnable 中使用变量(布尔值)作为中断标志,我将其视为最后的手段,但很乐意了解错误。
根据 ExecutorService
文档,关闭正在执行的任务是在尽力而为的基础上完成的。
因此,当您调用 ExecutorService.shutdownNow()
时,实现将 尝试 关闭所有当前正在执行的任务。每个任务将保持 运行 直到它到达它检测到它被中断的点。
为了确保您的线程在早期阶段达到该点,最好在您的循环中添加一个线程是否被中断的检查,如下所示:
Thread.currentThread().isInterrupted();
通过在每次迭代中进行此调用,您的线程将检测到与实际中断间隔较短的中断。
因此您修改后的 Callable
代码将如下所示:
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if(Thread.currentThread().isInterrupted()) {
return null;
}
if(someCondition) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
顺便说一下,如果您不打算 return 来自 call()
方法的任何值,那么使用 Callable
是没有意义的。如果您在任务中需要参数化类型,只需创建一个参数化 Runnable
,如下所示:
public class ParameterizedRunnable<T> implements Runnable {
private final T t;
public ParameterizedRunnable(T t) {
this.t = t;
}
public void run() {
//do some work here
}
}
解决方案(出路): 所以最后我继续使用自定义内部标志(布尔值)作为线程中断标志,MyRunnable 将在每次迭代时检查它(带有自定义标志的 runnable 的自定义实现,以便有一个与每个 runnable 相关联的标志)。当需要取消 ExecutorService(ThreadPool) 下的线程时,我遍历所有 Future 对象并将其关联到 MyRunnable,然后将其中断标志(自定义标志)设置为 true,这比 interrupts/close thread.
线程池管理器:
private class ThreadPoolManager {
private ExecutorService executorService;
private final Map<Future,MyRunnable> queuedFutures;
private final BlockingQueue<Runnable> blockingQueue;
private ThreadPoolManager() {
MyLogger.log(DEBUG, "Threadpool-created(constructor)");
queuedFutures = new HashMap<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = getNewExecutorService();
}
private ExecutorService getNewExecutorService() {
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
private void submitItemTest(MyRunnable runnable) {
MyLogger.log(DEBUG, "Threadpool-submitted item test");
if (executorService.isShutdown()) {
executorService = getNewExecutorService();
}
Future future = executorService.submit(runnable);
queuedFutures.put(future,runnable);
}
private void submitTestAll(MyRunnable runnable) {
MyLogger.log(DEBUG, "Threadpool-submitted test all");
if (executorService.isShutdown()) {
executorService = getNewExecutorService();
}
cancelAll();
Future future = executorService.submit(runnable);
queuedFutures.put(future,runnable);
}
private void cancelAll() {
MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks");
blockingQueue.clear();
for (Future future : queuedFutures.keySet()) {
if (!future.isDone()) {
queuedFutures.get(future).continueRunning=false;
MyLogger.log(DEBUG, "Cancelled");
}
}
queuedFutures.clear();
}
private void shutdownExecutor() {
cancelAll();
MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool");
executorService.shutdown();
}
}
MyRunnable(实现Runable的抽象class):
private abstract class MyRunnable implements Runnable {
boolean continueRunning=true;
}
MyRunnable(抽象classMyRunnable的实例):
new MyRunnable() {
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (continueRunning) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)");
break;
}
}
System.out.println("ThreadPool: Test complete");
}
};
现在,调用 threadPoolManager.shutdownExecutor() shutdown/interrupts 当前所有线程 运行。