处理线程池中的优先级

Handling priority in ThreadPools

目前,我们的一个应用程序中有 2 个线程池:

  1. 第一个是用来处理定时任务的
  2. 第二个处理每个计划任务的并行处理 运行

需要设置两个不同的池来自以下想法:如果多个计划任务在主(第一个)池中排队,并且它触发了同一个池中的子任务(并行处理),这将导致竞争条件也会让其他计划任务排队"behind",所以实际上什么都不会结束并且会发生死锁。

如果子任务的优先级高于计划任务怎么办?他们会 "jump" 排队并暂停计划任务以完成吗?或者那不会发生?有什么办法可以强制这种行为吗?或者当 ThreadPoolExecutor 已经 运行 正在执行任务时不能暂停任务?

池 1 在 Spring 的应用程序上下文 XML 配置文件中定义为:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"  xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/aop
 http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context-3.0.xsd
 http://www.springframework.org/schema/task
 http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <context:annotation-config />
    <context:component-scan base-package="cl.waypoint.mailer.reportes" />
    <task:annotation-driven scheduler="myScheduler" />
    <task:scheduler id="myScheduler" pool-size="2" />
    <aop:aspectj-autoproxy />


</beans>

池2在代码中定义如下:

public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(), new ThreadFactory() {
            final AtomicLong count = new AtomicLong(0);
            private String namePreffix = "TempAndDoor";

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(false);
                t.setPriority(Thread.NORM_PRIORITY);
                t.setName(MessageFormat.format("{0}-{1}", namePreffix, count.getAndIncrement()));
                return t;
            }
        });

如果我理解正确 myScheduler (pool1) 用于安排任务,一旦时间开始,它将任务提交给 执行者(pool2)。

提出问题

如果子任务的优先级高于计划任务怎么办?

不清楚您如何区分它们或您的真正意思,但我的理解是子任务是计划任务。

我想你应该有一些像这样的代码:

@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() {
    executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
}

其中 executor 是您要在其中创建的静态对象。(不应该全部大写和结尾 ;))

根据它的长名称,你提交给执行者的是"sub tasks"或者执行者醒来提交的任务。在这种情况下,您的 myScheduler 将始终及时唤醒,因为执行的是非阻塞的。

另一方面,因为你有一个 LinkedBlockingDeque 这意味着按顺序执行,你的 executor 可能会得到备份。

另一个问题:

他们会"jump"排队并暂停计划任务以完成吗?

它们不会跳转,调度程序启动,提交新任务并再次进入休眠状态。队列开始填满

下一题:

有什么方法可以强迫这种行为吗?或者当 ThreadPoolExecutor 已经是 运行 任务时不能暂停任务?

您可以一起取消任务,但您要跟踪所有提交的任务

你可以抓住未来

Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);

并且您需要以某种方式知道您实际上想要取消任务,您需要调用

aFuture.cancel();

看你需要的东西比较复杂,建议看JMS,比较成熟,或者AKKA,可能比较容易掌握。

以下一段代码可能适合您。

com.job.Job

package com.job;

import java.util.concurrent.CountDownLatch;

public class Job implements Runnable {

    public enum JobPriority {
        HIGH, MEDIUM, LOW
    }

    private String jobName;
    private JobPriority jobPriority;
    private CountDownLatch cdl;

    public Job(String jobName, JobPriority jobPriority) {
        super();
        this.jobName = jobName;
        this.jobPriority = jobPriority;
    }

    @Override
    public void run() {
        try {
            System.out.println("Job:" + jobName + " Priority:" + jobPriority);
            Thread.sleep(3000);

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            cdl.countDown();
        }
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public JobPriority getJobPriority() {
        return jobPriority;
    }

    public void setJobPriority(JobPriority jobPriority) {
        this.jobPriority = jobPriority;
    }

    public void initCdl(CountDownLatch countDownLatch) {
        this.cdl = countDownLatch;
    }

    // standard setters and getters

}

com.scheduler.PriorityJobScheduler

package com.scheduler;

import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

import com.job.Job;

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;
    private CountDownLatch countDownLatch;
    private int jobCount = 0;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        countDownLatch = new CountDownLatch(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {

                    Job j = priorityQueue.take();

                    j.initCdl(countDownLatch);

                    priorityJobPoolExecutor.execute(j);

                    jobCount++;

                    if (jobCount >= poolSize) {
                      countDownLatch.await();
                      jobCount = 0 ;
                      countDownLatch = new CountDownLatch(poolSize);
                }

                } catch (InterruptedException e) {
                    // exception needs special handling
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }

    public void cleanUp() {

        priorityJobScheduler.shutdown();
    }

}

测试

import java.util.ArrayList;
import java.util.List;

import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;

public class Test {

    private static int POOL_SIZE = 3;
    private static int QUEUE_SIZE = 10000;

    public static void main(String[] args) {

        
        PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);

        for (int i = 0; i < 100; i++) {
            Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
            Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
            Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);

            if (i < 30)
                pjs.scheduleJob(job1);
            else if (i < 60)
                pjs.scheduleJob(job2);
            else
                pjs.scheduleJob(job3);

            try {
                Thread.sleep(5);
            } catch (InterruptedException e) { // TODO
                e.printStackTrace();
            }

        }

        pjs.cleanUp();

    }

}

来源:https://www.baeldung.com/java-priority-job-schedule

Baeldung 的原始示例在此代码段中没有使用 CountDownLatch。您也可以浏览 Baeldung 网站上的教程。如果您对在这种情况下使用 CountDownLatch 有任何疑问,请随时在评论部分提问。