使用 corePoolSize 而不是 maxPoolSize 的 InvokeAll 方法

InvokeAll method using corePoolSize instead of maxPoolSize

我有一个应用程序需要将数据提取到 ES 中。有多行,所以我使用 ThreadPoolTaskExecutor 以 50 批次的方式并行化数据的摄取。另外,我正在我的消费者中获取这些数据,我不想在处理完所有记录之前确认,所以我已将任务封装在 Runnable class 中,我将它们添加到可调用列表中,并在列表包含所有数据时使用 invokeAll() 。

我的 ThreadPoolTaskExecutor 的 maxPoolSize 是 200corePoolSize50。因此,我希望应用程序一次使用 200 个线程,据我所知,当排队的任务少于 corePoolSize 时,ThreadPoolTaskExecutor 使用线程最多 corePoolSize 并且稍后它开始将线程添加到 maxPoolSize 并执行任务。当我检查我的日志和 visualVm 时,我发现只有 50 个线程在执行,所以,我将 corePoolSize 增加到 70 并发现 70 个线程正在执行执行。队列大小应该超过 3000,因为列表超过 300 万。该问题的示例代码。

public void Test() throws InterruptedException {
List<Callable<Object>> tagList = new ArrayList<Callable<Object>>(1000);
for(int i=0; i<400; i++){
  tagList.add(Executors.callable(new Runnable() {
   @Override
   public void run() {
     System.out.println("Thread "+ Thread.currentThread());
     try {
       Thread.sleep(4000);
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
   }
 }));
}
ThreadPoolTaskExecutor.getThreadPoolExecutor().invokeAll(tagList);
}

信息 | 2021-02-14 00:06:32.086+05:30 |申请 | 134 |当前线程数:50 总线程数:200

信息 | 2021-02-14 00:06:33.085+05:30 |申请 | 134 |当前线程数:50 总线程数:200

如果我遗漏了什么,请有人解释一下。

maxPoolSize of my ThreadPoolTaskExecutor is 200 and corePoolSize is 50.So,I was expecting the application to use 200 threads at one time as by my understanding ThreadPoolTaskExecutor uses threads upto to corePoolSize when queued up tasks are less than the corePoolSize and later it starts adding threads upto the maxPoolSize and executes the tasks.

这不太对。 ThreadPoolExecutor:

就是这种情况

When a new task is submitted (..), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

因此,即使 maxPoolSize 是 200,您只有 50 个线程 运行 的原因。

ThreadPoolTaskExecutor可以读到:

The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.newSingleThreadExecutor(), sharing a single thread for all tasks. Setting "queueCapacity" to 0 mimics Executors.newCachedThreadPool(), with immediate scaling of threads in the pool to a potentially very high number. Consider also setting a "maxPoolSize" at that point, as well as possibly a higher "corePoolSize" (see also the "allowCoreThreadTimeOut" mode of scaling).

看来您必须将 queueCapacity 设置为零才能满足您的要求。