Java ExecutorService - 为什么这个程序保留 运行?

Java ExecutorService - why does this program keep running?

我正在尝试构建类似后台任务执行器的东西,如果没有答案,它会在一定时间后终止后台任务(后台任务调用 web 服务,它们可能会超时,但我需要确保它们在以下情况下超时某个时间)

所以我将此作为实验,但如果我 运行 此程序不会终止。我想知道是不是因为后台线程仍然处于活动状态?我怎样才能关闭它?

public class Test {

public static class Task implements Callable<Object> {

    @Override
    public Object call() throws Exception {
        while(true) {}
    }

}

public static void main(String[] args) {
    try {
        Task t = new Task();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.invokeAll(Arrays.asList(t), 5L, TimeUnit.SECONDS);
        executor.shutdown();
        System.out.println("DONE");
    } 
    catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

ExecutorService 不会杀死 运行 个线程,并且由于线程是作为非守护程序创建的,因此 JVM 不会退出。

发生的事情是,当超时到期时,invokeAll() 返回的期货被 取消,这意味着在未来对象上设置了一个标志,你得到一个 CancellationException 如果您尝试调用 future.get()。然而,invokeAll() 和 shutdown()(或 shutdownNow())都没有做任何事情来终止线程。

请注意,您甚至不能自己终止线程。您所能做的就是设置一些特定于应用程序的标志或调用 Thread.interrupt(),但即使这样也不能保证 the thread terminates.

Winterbe post 对执行者的工作方式有很好的介绍。这是他教程的节选

所以基本上执行者总是继续监听新任务或 callables/runnables 关闭执行器或停止执行器监听的一种方法是中断它正在执行的任何任务。一种方法是调用 future.get() ,当主线程停止时,暂停它并确保在将资源移交给其他线程之前完全执行当前线程

您可能拥有更多线程,并在 InterruptedException 块中编写代码以正常关闭

这是我编写并测试过的示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorTest {

    public static void main(String[] args) {

        ExecutorService service = Executors.newWorkStealingPool(10);

        Callable<AccountClass> newInstance = () -> {
            TimeUnit.SECONDS.sleep(3);
            return getAcc(Thread.currentThread().getId());
        };

        // for now only one instance is added to the list
        // List<Callable<AccountClass>> callablesSingleList = Arrays.asList(newInstance);

        // adding multipleCallalbes
        List<Callable<AccountClass>> callablesMultipleList = Arrays.asList(
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                },
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                },
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                });

        try {
            service.invokeAll(callablesMultipleList).stream().map(future -> {
                AccountClass fuClass = null;
                try {
                    fuClass = future.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                return fuClass;
            }).forEach(getValue -> {
                System.out.println("retunred value:" + getValue);
            });
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

    }

    private static AccountClass getAcc(long itr) {
        // probably call DB for every new thread iterator
        System.out.println("getting the current thread" + itr);
        AccountClass ac = new AccountClass();
        ac.setId(itr);
        ac.setName("vv");
        ac.setRole("admin");
        System.out.println("sending the accnt class:" + ac);
        return ac;
    }
}

更新:

另一种关闭执行程序的方法是使用 service.shutDownNow() - > 关闭程序,即使它正在执行。您可以使用 awaitTermination 方法来指定您是否觉得可能需要几分钟才能完成执行然后可能会关闭服务

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorScheduleFixedRate {

    public static void main(String[] args) {

        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

        Runnable task = () -> {
            getAcc(33);
        };

        service.scheduleWithFixedDelay(task, 10, 5, TimeUnit.SECONDS);

        if (!service.isShutdown()) {
            List<Runnable> list2 = service.shutdownNow();
            System.out.println(list2);
            System.out.println("is shutdonw" + service.isShutdown());
            System.out.println("Do something after the thread execution");
        }

    }

    private static AccountClass getAcc(long itr) {
        // probably call DB for every new thread iterator
        System.out.println("getting the current thread" + itr);
        AccountClass ac = new AccountClass();
        ac.setId(itr);
        ac.setName("vv");
        ac.setRole("admin");
        System.out.println("sending the accnt class:" + ac);
        return ac;
    }

}