`ThreadPoolTask​​Executor` 线程在 Spring 中执行后未被杀死

`ThreadPoolTaskExecutor` Threads are not killed after execution in Spring

我正在尝试将 Quartz 顺序执行更改为并行执行。

它工作正常,性能方面,看起来不错,但生成的(创建的)线程没有被销毁。

仍处于Runnable状态;为什么以及我该如何解决? 请指导我。

代码在这里:

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(lstOfExams.size());
            threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
            threadPoolExecuter.setQueueCapacity(100);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.initialize();

            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexams=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexams.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone);
                        threadPoolExecuter.submit(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    Log.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }

            }
            threadPoolExecuter.shutdown();
        }
}

UpdateUserExamDataThread .class

@Component
//@Scope(value="prototype", proxyMode=ScopedProxyMode.TARGET_CLASS)
//public class UpdateUserExamDataThread extends ThreadLocal<String> //implements Runnable {
public class UpdateUserExamDataThread implements Runnable {
    private Logger log = Logger.getLogger(UpdateUserExamDataThread.class);
    @Autowired
    ExamService examService;
    @Autowired
    TestEvaluator testEvaluator;
    private Object[] obj;
    private String timeZone;


    public UpdateUserExamDataThread(Object[] obj,String timeZone) {
        super();
        this.obj = obj;
        this.timeZone = timeZone;
    }

    @Override
    public void run() {
        String threadName=String.valueOf(obj[0]);
        log.info("UpdateUserExamDataThread Start For:::::"+threadName);
        testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString());
        //update examResult
        log.info("UpdateUserExamDataThread End For:::::"+threadName);
    }

}

TestEvaluatorImpl.java

@Override
    @Transactional
    public Examresult generateTestResultAsPerEvaluator(Long userExamId, String evaluatorType, String codingLanguage,String timeZoneFollowed ,String inctenceId ,String userId) {
        dbSchema = messageService.getMessage("database.default_schema", null, Locale.getDefault());

        try {
//Some Methods
return examResult;
}catch(Exception e){
log.erorr(e);
}
}

如果需要,我可以提供线程转储文件。

我怀疑您的线程之一无限期地等待 IO 请求答复。例如,您尝试连接到未设置连接超时的远程主机,但主机没有应答。在这种情况下,您可以通过底层 ExecutorService 的 运行 shutdownNow 方法强制关闭所有正在执行的任务,然后您可以分析有问题的线程抛出的 InterruptedIOException

替换

threadPoolExecuter.shutdown();

在下面,以便您检查错误。

ExecutorService executorService = threadPoolExecuter.getThreadPoolExecutor();
executorService.shutdownNow();

这将向所有 运行 个线程发送 interrupt 信号。

看来您创建的线程池与考试大小相同,这并不是最佳选择。

    // Core pool size is = number of exams  
    threadPoolExecuter.setCorePoolSize(lstOfExams.size());

    // Max pool size is just 1 + exam size. 
    threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);

你必须考虑: - 如果您创建一个线程池并启动它,则立即启动核心大小中定义的线程数。

  • 最大池大小仅在您提交的数量超过核心池线程现在可以处理的数量并且队列已满(在本例中为 100)时才有效。因此,这意味着只有当提交的任务数量超过 100+exam 大小时,才会创建一个新线程。

在你的情况下,我会将核心池大小设置为 5 或 10(这实际上取决于你的目标 CPU 有多少核心 and/or 提交任务的 IO 绑定程度)。

最大池大小可以是它的两倍,但在队列已满之前它不会生效。

要在提交的工作完成后减少活动线程的大小,您必须设置 2 个参数。

  • setKeepAliveSeconds(int keepAliveSeconds) :如果线程在定义的秒数(默认为 60 秒,即最佳)但这通常只用于关闭非核心池线程的线程。

  • 要在 keepAliveSeconds 之后关闭核心部分的线程,您必须将 setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) 设置为 true。只要应用程序是 运行.

  • ,保持核心池活动通常是错误的

希望对您有所帮助。

线程不等待来自某个远程服务器的 IO,因为线程上执行的方法将在某些 jdbc 驱动程序 类 中,但它们目前都在 UpdateUserExamDataThread.run(),第 37 行。

现在的问题是:UpdateUserExamDataThread.java 第 37 行的代码是什么? 不幸的是,目前给出的 UpdateUserExamDataThread.java 是不完整的 and/or 不是真正执行的版本:缺少包声明,它在第 29 行结束。

我怀疑问题仅仅是因为您在使用 submit() 生成任务线程时调用了 运行() 而不是 execute()。使用 submit 时可能会有一些期望,线程会在任务完成时自行终止,而不是在 运行 方法结束时终止。

只需要增加线程的优先级并根据处理器中的核心数创建线程数。

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        int cores = Runtime.getRuntime().availableProcessors();
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(cores);
//          threadPoolExecuter.setMaxPoolSize(Integer.MAX_VALUE);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
//          threadPoolExecuter.setQueueCapacity(Integer.MAX_VALUE);
            threadPoolExecuter.setQueueCapacity(lstOfExams.size()+10);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.setWaitForTasksToCompleteOnShutdown(true);
            threadPoolExecuter.setThreadPriority(10);
            threadPoolExecuter.initialize();


            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexam=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexam.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone,testEvaluator);
//                      threadPoolExecuter.submit(task);
                        threadPoolExecuter.execute(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    logger.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }
            }
                threadPoolExecuter.shutdown();
        }
}