如何与执行者服务线程通信

how to communicate with Executor Service Threads

从控制器 Class,我调用这个 Helper 来启动一个进程并返回到 UI 进程启动

助手Class:

public class Helper {

public String startService() { //Before starting the service I save the status of the service as Started in the DB

    ExecutorService service = Executors.newSingleThreadExecutor();
    service.submit(new  Runnable() {
        public void run() {
        new Worker().startWork(callableTaskList);   
            }
        });
return "started"
    }
public void stopService() { 
// I Saved the status in DB as Stopping (Just in case). but now how to pass flag an to pass to startWorkMethod to stop if some flag in false and stop processing.
}

工人Class

  public class Worker {

    public void startWork(List<CallableTask> callableTaskList) throws Exception {
        ExecutorService service=Executors.newFixedThreadPool(50);
        ExecutorService anotherService=Executors.newFixedThreadPool(50);
for (List<CallableTask> partition : Iterables.partition(callableTaskList, 500)){
          // do some work here and then return
            List<Future<String>> futures=service.invokeAll(partition );
            for(Future<String> future: futures){
                anotherService.submit(new Task(future.get()));
            }
        }

现在我的问题是如何停止已经启动的服务?由于 callableTaskList 是一个巨大的列表,我将其分成批次并进行处理。现在,如果我想停止这个过程,我该怎么做? 我认为 worker class 中应该有一个标志,如果我应该继续处理它,应该在每个分区 运行 之后检查它。 但是我不明白如何将这个标志传递给工人class。我创建了一个停止服务的方法,我想创建一个易失性原子布尔标志并将其传递给 startWork 方法。但我想只有当它们都是单例对象时它才会起作用。由于单例对象只有一个实例,我可能最终也会停止其他当前 运行ning 服务。 (不确定,需要澄清)。

谢谢。

在每个级别都保留对 ExecutorService 的引用,以便可以调用 shutdownNow()。例如:

    public class Helper {
        private ExecutorService service;

        public String startService() {
           // ExecutorService service = Executors.newSingleThreadExecutor();
            service = Executors.newSingleThreadExecutor();
            service.submit(new  Runnable() {
                public void run() {
                    new Worker().startWork(callableTaskList);   
                }
            });
            return "started"
        }

    public void stopService() { 
        service.shutdownNow();
    }
}

但是,要使其正常工作,API 表示 Callable/Runnable 必须表现良好,并且 在被打断时做出响应

例如:

    public class Worker {
        private ExecutorService service;
        private ExecutorService anotherService;

        public void startWork(List<CallableTask> callableTaskList) throws Exception {
            service=Executors.newFixedThreadPool(50);
            anotherService=Executors.newFixedThreadPool(50);

            for (List<CallableTask> partition : Iterables.partition(callableTaskList, 500)){
                checkInterruptStatus(); 


                // do some work here and then return
                List<Future<String>> futures=service.invokeAll(partition );
                for(Future<String> future: futures){
                    checkInterruptStatus();

                    anotherService.submit(new Task(future.get()));
                }
            }
        }

        private void checkInterruptStatus() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            } 
        }

        public void stopService() {
            service.shutdownNow();
            anotherService.shutdownNow();
        }
    }