使用多个线程处理 'N' 项的列表

Process list of 'N' items with multiple threads

我有 ListN 项,我想按顺序将这个 List 分成固定数量的 threads

按顺序我的意思是,我想将 1 to N/4 传递给第一个 threadN/4 + 1 to N/2 传递给第二个线程,N/2+1 to N 传递给第三个 thread,现在一旦所有 threads 都完成了他们的工作,我想通知 main thread 发送一些消息,说明所有处理都已完成。

到目前为止我所做的是我已经实现了ExecutorService

我做了这样的事情

ExecutorService threadPool = Executors.newFixedThreadPool(Number_of_threads); 
                //List of items List
                List <items>itemList = getList(); 
                  for (int i = 0 i < Number_of_threads ;i++ ) { 
                    //how to divide list here sequentially and pass it to some processor while will process those items.
                    Runnable processor = new Processor(Start, End)
                    executor.execute(process);
                   }
                  if(executor.isTerminated()){
                    logger.info("All threads completed");
                  }

如果你想要的是让所有线程尽快完成处理并且项目的数量不是巨大那么只需post一个Runnable 每个项目变成 newFixedThreadPool(NUMBER_OF_THREADS):

    ExecutorService exec = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    List<Future<?>> futures = new ArrayList<Future<?>>(NUMBER_OF_ITEMS);
    for (Item item : getItems()) {
        futures.add(exec.submit(new Processor(item)));
    }
    for (Future<?> f : futures) {
        f.get(); // wait for a processor to complete
    }
    logger.info("all items processed");

如果您真的想给每个线程一个列表的连续部分(但仍然希望它们尽快完成,并且还希望处理每个项目大约需要相同的时间),然后尽可能将项目拆分为 "evenly",以便每个线程的最大项目数与最小数之差不超过一个(例如:14 项目,4 个线程,那么您希望拆分为 [4,4,3,3],而不是 [3,3,3,5])。为此,您的代码将是例如

    ExecutorService exec = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    List<Item> items = getItems();
    int minItemsPerThread = NUMBER_OF_ITEMS / NUMBER_OF_THREADS;
    int maxItemsPerThread = minItemsPerThread + 1;
    int threadsWithMaxItems = NUMBER_OF_ITEMS - NUMBER_OF_THREADS * minItemsPerThread;
    int start = 0;
    List<Future<?>> futures = new ArrayList<Future<?>>(NUMBER_OF_ITEMS);
    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
        int itemsCount = (i < threadsWithMaxItems ? maxItemsPerThread : minItemsPerThread);
        int end = start + itemsCount;
        Runnable r = new Processor(items.subList(start, end));
        futures.add(exec.submit(r));
        start = end;
    }
    for (Future<?> f : futures) {
        f.get();
    }
    logger.info("all items processed");