java 多线程 - 限制对 ExecutorService 的提交

java Multithreading - throttle submission to ExecutorService

我有一个包含数千行的数据文件。我正在阅读它们并将它们保存在数据库中。我想在 50 个批次中对这个过程进行多线程处理 行。正如我在文件中所读到的,10 行已提交给 ExecutorService。

ExecutorService executor = Executors.newFixedThreadPool(5);`

我可以在一个 while 循环中执行下面的操作,直到我的行结束....

 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

但是,如果处理 10 行需要时间,我不想将整个文件读入内存。我只想提交 5 直到其中一个线程 return,然后我提交下一个。

假设一个线程需要 20 秒来保存 10 条记录,我不希望 ExecutorService 被提供数千行,因为读取过程正在继续读取并提交给 ExecutorService

实现此目标的最佳方法是什么?

您可以使用 LinkedList<Future<?>> 存储期货,直到您达到某个预定大小。下面是一些框架代码,应该可以帮助您完成大部分工作:

int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();

//As long as there are rows to save:
while(moreRowsLeft()){
    //dump another callable onto the queue:
    futures.addLast(service.submit(new RowSavingCallable());

    //if the queue is "full", wait for the next one to finish before
    //reading any more of the file:
    while(futures.size() >= 2*threads) futures.removeFirst().get();
}

//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();

//All rows have been saved at this point

您可能想知道为什么我允许 futures 的数量达到机器上线程数量的两倍 - 这允许执行程序服务线程在主线程创建更多工作时处理数据库保存.这有助于隐藏任何 I/O 与在工作线程忙于执行数据库写入时使更多可调用对象可用于处理相关的成本。