Java 随着线程池大小的增加,多线程性能最差

Java multi threading performance worst as increasing thread pool size

我在mongoDB中有4000万条数据。我正在从集合中并行读取数据,对其进行处理并将其转储到另一个集合中。

作业初始化示例代码。

ExecutorService executor = Executors.newFixedThreadPool(10);
int count = total_number_of_records in reading collection
int pageSize = 5000;
int counter = (int) ((count%pageSize==0)?(count/pageSize):(count/pageSize+1));
for (int i = 1; i <= counter; i++) {
        Runnable worker = new FinalParallelDataProcessingStrategyOperator(mongoDatabase,vendor,version,importDate,vendorId,i,securitiesId);
        executor.execute(worker);
    }

每个线程都在做以下事情

public void run() {
    try {
        List<SecurityTemp> temps = loadDataInBatch();
        populateToNewCollection(temps);
        populateToAnotherCollection(temps);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

使用以下查询对加载数据进行分页

mongoDB.getCollection("reading_collection").find(whereClause).
            .skip(pagesize*(n-1)).limit(pagesize).batchSize(1000).iterator();

pagination code reference

机器配置: 2 CPU 每个 1 个核心

并行实现的性能几乎与顺序实现相同。 数据子集的统计信息(319568 条记录)

No. of Threads   Execution Time(minutes)

   1                 16 
   3                 15
   8                 17
   10                17
   15                16
   20                12
   50                30

如何提高此应用程序的性能?

由于您从单一源读取输入数据,该部分很可能是 IO 绑定的(从您的应用程序的角度来看),因此并行执行它不会给您带来太多好处。相反 - 我认为在多个线程上并行执行类似的查询(只是分页不同)会对性能产生负面影响:必须在数据库上多次完成相同的工作并且并行查询可能会相互影响方式。

另一个问题是,与读取输入相比,处理部分是否占用了大量时间。如果它不使用并行处理将无助于加快速度。如果是,我建议如下:

  • 使用单个查询从数据库中获取数据
  • 有多个工作线程从结果集或中间队列获取数据项并处理它们。不需要有固定的批次,每个工作人员在处理完前一个项目后,只需抓取下一个可用项目。

至于线程数:最短处理时间的"sweet spot"取决于处理的种类。对于没有太多 IO 处理的 CPU 密集型任务,它很可能与可用内核的数量有关 - 在您的情况下为 2。

多线程不会随着线程数量的增加而提高性能。

受 IO 限制的应用程序不会从多线程中获益太多。

这取决于很多因素。参考这个相关的SE问题:

即使对于 IO 限制较少、CPU 密集型应用程序,也不要配置大量线程来提高性能。

您可以将代码更改为:

ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());

或者(如下所示的 ForkJoinPool [从 jdk 1.8 版本开始工作)

ExecutorService executor = Executors.newWorkStealingPool()

Executors API:

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism leve