blockingqueue 和 Spring - 如何在启动时启动线程池?

blockingqueue and Spring - how do I start thread pool at startup?

我有一个 spring 应用程序,它使用 blockingqueue 来 运行 生产者-消费者设计。基本上,当用户使用 REST 控制器进行 API 调用时,它会向阻塞队列添加一个工作,后台线程将在它到达后立即使用该队列。

我看到 Spring 建议使用它的 TaskExecutor,所以我有以下 class。 ThreadConfig.java

@Configuration
public class ThreadConfig {

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setThreadNamePrefix("default_task_executor_thread");
    executor.initialize();
    return executor;
  }

}

我还有一个消费者组件,它监视队列和 运行 任务。 MessageConsumer.java

@Component
public class MessageConsumer implements Runnable{


  private final BlockingQueue<String> queue;

  MessageConsumer(BlockingQueue<String> queue){
    this.queue = queue;
  }

  public void run(){
    try {
      while (true) {
        String str = queue.take();
        // Do something
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

}

现在,我不确定 spring 应用程序启动时如何启动线程池。

我只是在 Main 中添加代码吗?

如有任何帮助,我们将不胜感激。谢谢

我觉得你想多了。在内部 ThreadPoolTaskExecutor 还使用 BlockingQueue 将工作交给 ThreadPool 中的工作人员 Threads。由于您没有更改默认 queueCapacity 它将是 LinkedBlockingQueue.

在您的场景中,这样做会更容易:

  • 由于每个 HTTP 请求都由单独的 Thread 处理,生产者将是处理 HTTP 请求的 Thread
  • 消费者将是 ThreadPoolTaskExecutor 的工作人员 Thread

因此,要使生产者-消费者工作,只需 ThreadPoolTaskExecutor 创建一个任务并将其提交到您的线程池,其中一个工作人员将使用它:

@RestController
public class MyController {

    private final TaskExecutor taskExecutor;

    @Autowired
    public MyController(final TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @GetMapping("/test/{value}")
    public ResponseEntity<String> get(final @PathVariable("value") String value) {
        taskExecutor.execute(() -> {

            System.out.println(value);
            // do something with your String
            // this will be executed by some worker Thread
        });

        return ResponseEntity.ok(value);
    }
}

而且由于您正在使用 ThreadPoolTaskExecutor 并且它实现了 DisposableBean 接口(通过扩展 ExecutorConfigurationSupport)- 您不必显式 shutdown 池。 Spring 会在 Spring 上下文被销毁并且 destroy 将在池 Bean 上调用此接口的方法时为您执行此操作。