我无法使用 ExecutorService 获得 RejectedExecutionException

I can not get RejectedExecutionException with ExecutorService

我为 RejectedExecutionException 的测试处理创建调度程序:

@Component
public class TestScheduler {

    private final TestService testService;
    private ExecutorService executorService;

    public TestScheduler(TestService testService) {
        this.testService = testService;
    }

    @PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(5);
    }

    @Scheduled(fixedRate = 10L)
    public void test() {
        System.out.println("test");
        executorService.execute(testService::print);
    }
}

延迟 70 秒的服务:

@Component
public class TestService {

    public void print() {
        System.out.println("print start");
        try {
            TimeUnit.SECONDS.sleep(70);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("print end");
    }
}

我等下逻辑:

  1. 调度程序调用 executorService.execute(testService::print) 5 次
  2. 每个testService::print将执行70秒
  3. execute 方法第六次调用时,我得到 RejectedExecutionException

但我没有得到异常。我有这个日志:

test
print start
2018-10-22 11:26:45.543  INFO 5960 --- [           main] c.e.s.SchedullerExceptionsApplication    : Started SchedullerExceptionsApplication in 0.661 seconds (JVM running for 1.108)
test
print start
test
print start
test
print start
test
print start
test
...
70 seconds print test

编辑

在实际项目中我有这样的代码:

@PostConstruct
    public void init() {
        executorService = Executors.newFixedThreadPool(100, new CustomizableThreadFactory("SendRequestExecutor-"));
    }

@Scheduled(fixedDelay = 1000L)
public void sendReady() {
    try {
        List<Message> messages = messageService.findReadyToSend();
        for (Message message : messages) {
            message.setStatus(IN_PROCESS);
            Message savedMessage = messageService.save(message);
            executorService.execute(() -> sendRequestService.send(savedMessage.getGuid()));
        }
    } catch (Exception e) {
        log.error("handle: " + e.getMessage());
    }
}

这是否意味着这段代码是错误的?因为这可能发生所以我将实体更改为状态 IN_PROCESS 并在尝试执行时 - 如果 executorService 已满我没有得到异常并且 executorService 不执行我的任务?

RejectedExecutionException 会抛给你 溢出任务队列(现在是未绑定的),而你希望它在你安排更多任务时被抛出(把它可能进入无界队列)然后你有工作人员 - 那是废话,因为这是工作人员的目的 - 构建队列并从中执行。

要测试错误处理,要么模拟你的执行器并在任务提交时抛出异常(最好),要么使用有限的、有界的、非阻塞队列作为你的执行器的后备队列。

使用 mock 是最简单的方法。

定义执行器涉及两个方面。

  1. 执行器将使用的线程数。 这限制了执行器可以处理的并发任务数运行。这是您通过 Executors.newFixedThreadPool(5).
  2. 设置的内容
  3. 底层任务提交队列的大小。这限制了底层队列在抛出异常之前可以存储的任务数。 newFixedThreadPool 创建的执行器使用无界队列,因此不会出现异常。

您可以通过如下创建自己的执行程序服务并向其提交 11 个任务(5 个使用所有线程,5 个填充底层任务队列,1 个溢出它)来实现您想要的行为。

new ThreadPoolExecutor(5, 
                       5, 
                       2000L, 
                       TimeUnit.MILLISECONDS, 
                       new ArrayBlockingQueue<Runnable>(5, true), 
                       new ThreadPoolExecutor.CallerRunsPolicy());