使用线程池时,调用 Future#get 程序挂起

When using a thread pool, call Future#get and the program hangs

我创建了一个线程池,提交了两个任务。为什么我的应用程序在打印 task one ,result: null 后毫无异常地挂起???

   private final static ThreadPoolExecutor executorService = new
        ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
        new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws Exception {
        Future taskOne = executorService.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Future taskTwo = executorService.submit(() -> System.out.println("task two is working"));;
        System.out.println("task one ,result: " + taskOne.get());
        System.out.println("task two, result: " + taskTwo.get());
        executorService.shutdown();
    }

当你提交第二个任务时,因为线程池使用了SynchronousQueue并且maximumPoolSize为1,而第一个任务还没有完成,所以触发了拒绝策略。您正在使用 DiscardPolicy,这意味着线程池不执行任何操作,并且 returns 您的状态始终为 NEW.

FutureTask
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

所以当你调用taskTwo#get()时,你总是会被阻止。 (FutureTask 处于小于 COMPLETING 的状态时将始终被阻塞,参见 FutureTask#get)。

你可以使用AbortPolicy(默认策略),这样当你执行executorService.submit(() - > submit; System.out.println("task two is working"))时,你会立即得到一个RejectedExecutionException.

或使用 Future#get(timeout),在这种情况下,如果您在指定时间内未获得结果,您将获得 TimeoutException

new ThreadPoolExecutor.DiscardPolicy() 提交失败时静默丢弃新任务。这里 taskTwo 想要执行,它永远没有机会执行。

DiscardPolicy() 方法从 RejectedExecutionHandler 接口内部调用 void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

为了更好地理解 taskTwo 线程条件,我已经展示了 CustomRejectedExecutionHandler。由于 taskTwo 被静默丢弃,因此 taskTwo.get() 方法将永远无法 return 数据。

所以超时需要设置为1秒(taskTwo.get(1000, TimeUnit.MILLISECONDS))

package example;

import java.util.concurrent.*;

public class ThreadPoolEx {

    public static void main(String[] args) {
        CustomRejectedExecutionHandler rejectionHandler = new CustomRejectedExecutionHandler();
        ThreadPoolExecutor executorService =
                new ThreadPoolExecutor(1, 1, 1L,
                        TimeUnit.MINUTES,
                        new SynchronousQueue<Runnable>(),
                        rejectionHandler
                );


        Future taskOne = executorService.submit(() -> {
            try {
                System.out.println("taskOne is going to sleep");
                Thread.sleep(2000);
                System.out.println("taskOne is wake up");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Future taskTwo = executorService.submit(() -> System.out.println("task two is working"));
        try {
            System.out.println("task one ,result: " + taskOne.get());
            System.out.println("isTerminating "+ executorService.isTerminating());
            System.out.println("getActiveCount "+ executorService.getActiveCount());
            System.out.println("is cancelled " + taskTwo.isCancelled());
            System.out.println("is isDone " + taskTwo.isDone());
            System.out.println("task two, result: " + taskTwo.get(1000, TimeUnit.MILLISECONDS));
        } catch (Exception e) {
        }
        executorService.shutdown();
    }
}

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }
}