Spring - 添加低优先级多线程服务(对生产性能无影响)

Spring - add a low priority multi threaded service (no impact to production performance)

我们有一个 Spring 应用程序,我想添加一个 服务,该服务将使用多个线程处理 10Ks ID,但将作为后台进程而不影响实时生产

服务将更新数据库并发送外部提供商请求。

我不想服务于impact/effect生产性能/时序,我想以低优先级对每个ID执行操作

我阅读了 previous post 关于在 Executer 中设置优先级的内容,但我希望所有其他线程的优先级都低于此特定 Executer 范围。

使用 ThreadPoolExecutor 的答案是否与我的案例更相关?

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

public final static class OpJobThreadFactory implements ThreadFactory {
 private int priority;
 public OpJobThreadFactory(int priority) {
  this(priority, true);
}

@Override
public Thread newThread(Runnable r) {
  Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
  t.setDaemon(daemon);
  t.setPriority(priority);
 }
}

或者我可以使用 Executors.newCachedThreadPool()

Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.

另外 我应该使用 Spring bean 吗? 因为我需要在 demand/request 上创建池所以它似乎不是 needed/wrong

编辑 我应该使用 Spring Actuator 来获取此任务还是使用其他监控工具?

Spring Boot Actuator module helps you monitor and manage your Spring Boot application by providing production-ready features like health check-up, auditing, metrics gathering, HTTP tracing etc. All of these features can be accessed over JMX or HTTP endpoints.

Here Spring 中有一个关于基于优先级的任务执行的很好的教程。也许这会在某些方面对您有所帮助。

这是一种创建可配置的任务“堆”并始终将您的主要任务放在堆顶部的方法。

要总结这个过程,您应该创建一个自定义任务执行器。首先,您需要创建一个 ThreadPoolTask​​Executor bean 并重写一个方法。应该修改的属性是:CorePoolSize(初始线程数)、QueueCapacity(队列中等待的线程数)和MaxPoolSize(最大线程数)。使用这些参数,您可以配置您的应用程序限制,以便此服务不会影响生产性能。

 @Bean("CustomTaskExecutor")
  public TaskExecutor threadPoolTaskExecutor(
          @Value("${spring.async.core-pool-size}") int corePoolSize,
          @Value("${spring.async.max-pool-size}") int maxPoolSize,
          @Value("${spring.async.queue-capacity}") int queueCapacity) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {

      @Override
      protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        return new PriorityBlockingQueue<Runnable>(queueCapacity);
      }

    };
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    return executor;
  }

之后,您需要创建任务执行者可以优先处理的任务 理解。为此,我们需要创建两个 classes: 1) 实现 Runnable 接口的自定义 class 将成为 运行 任务 2) 扩展 FutureTask 并实现 Comparable 接口的包装器 class,以便任务执行器可以理解任务的优先级选择逻辑

public class Task implements Runnable {
    private Consumer<Job> jobConsumer;
    private Job job;
      public Job getJob() {
        return this.job;
      }
      public Task(Consumer<Job> jobConsumer, Job job) {
        this.jobConsumer = jobConsumer;
        this.job = job;
      }
      @Override
      public void run() {
        this.jobConsumer.accept(job);
      }
    }

那么你就有了 FutureCustomTask class:

public class FutureCustomTask extends FutureTask<FutureCustomTask> implements Comparable<FutureCustomTask> {
private Task task;
public FutureCustomTask(Task task) {
    super(task, null);
    this.task = task;
  }
@Override
  public int compareTo(FutureCustomTask o) {
    return task.getJob().getPriority().compareTo(o.task.getJob().getPriority());
  }
}

为了执行,TaskExecutor 需要自动装配。 然后,您可以创建您的 Task 对象,将其包装在 FutureCustomTask 中,并将其传递给 TaskExecutor.The 代码应如下所示:

@Autowired
private TaskExecutor taskExecutor;
@Autowired
private JobBusiness jobBusiness;
...
Task task = new Task(jobBusiness::performSomethingOn, job);
taskExecutor.execute(new FutureCustomTask(task));

我想解释一下这个问题

什么是线程优先级?根据 java SE 文档

每个线程都有一个优先级。优先级较高的线程优先于优先级较低的线程执行。

即使您创建具有优先级的线程,它也不能完全保证优先级较低的线程首先执行,您可能必须阻塞优先级较低的线程,直到其他线程被执行

对于小型 java 程序,您可以自己处理线程执行,但对于大型程序,建议您使用 vanilla Java 中的 Executor Framework,它来自包 java.util.concurrent 或使用 spring TaskExecutor.By 同时使用这两个框架,您可以异步执行任务,这些任务在后台作为主任务的一部分执行。

对生产的影响:

例如,主要任务将是调用您的其余端点,即 /account,并在调用您要向客户发送欢迎电子邮件的帐户端点时,第三方 API 调用,可以使用 Executor Framework 或 Spring TaskExecutor 异步执行它们,即作为后台进程,它们不会对当前 API 产生影响,但它肯定会对生产服务器有影响,因为您是 运行 同一 JVM 中的线程,并且它们共享公共内存。如果创建了多个线程但没有销毁。你的服务器肯定会宕机。

因此,使用 Executor Framework 或 Spring TaskExecutor 并不能保证它不会影响您当前的生产,它肯定会提高被调用的其余 API 的性能。因为它是异步执行的,而且关于你的其他问题

我可以使用 Executors.newCachedThreadPool()

是的,如果您有许多短期任务,例如更新数据库中的单个列或仅触发一次休息端点,并且它不适合批量加载或执行某些更新 10000 条记录的后端作业,因为它将为每个任务创建更多线程,您肯定会遇到内存问题。

另外,我应该使用 Spring bean 吗?因为我需要创建一个池 on-demand/request 所以它似乎不是 needed/wrong

是的,您可以使用 ThreadPoolTask​​Executor https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html 根据文档将 "queueCapacity" 设置为 0 模仿 Executors.newCachedThreadPool() 。因此您可以使用 Spring TaskExecutor 或使用 vanilla Java 中的 Executor Framework 我个人建议使用 Spring TaskExecutor 具有更多功能,可以很好地开始使用 Spring 你可以参考教程 https://egkatzioura.com/2017/10/25/spring-and-async/ 这是一个很好的开始

最终判决:如果您希望仅执行异步任务或后台进程,您可以使用 java 中的 Executor Framework 或 Spring TaskExecutor,但两者都会对生产产生影响因为它们使用相同的 JVM。如果您根本不想影响生产,那么我建议在不同的服务器上创建单独的 spring 启动应用程序,并从新应用程序进行数据库调用或服务调用,并将其公开为休息端点并使用 Spring 任务执行器从您的主应用程序异步调用这些端点。

Spring 任务执行者:https://egkatzioura.com/2017/10/25/spring-and-async/

Java 执行器框架:https://stackabuse.com/concurrency-in-java-the-executor-framework/

使用低优先级的线程: https://medium.com/@daniyaryeralin/priority-based-thread-pooling-in-spring-framework-d74b91b51dcb

抱歉,如果答案太长:)