如果在这种情况下为 100,如何清除我的 ArrayList 的大小:

How to clear my ArrayList is size if 100 in this scenario:

我的最终目标是在Executor Service中实现行的批处理。我有以下代码片段:

while ((bLine = bufferedReader.readLine()) != null) {
    // We will use an array to hold 100 lines, so that we can batch process in a
    // single thread
    batchArray.add(bLine);
    switch (batchArray.size()) {
        case 100:
            Future<?> future = executor.submit(new LocalThreadPoolExecutor(batchArray, closeableHttpClient, httpPost));
            futures.add(future);
           // batchArray.clear() <--- point of failure
            break;
        default:
            logger.info("Batch size in switch: "+batchArray.size());

    }
}

现在,如果我在 case 100 中执行 batchArray.clear(),我会得到一个 concurrentModificationException。无法确定如何重新初始化数组列表并将 100 行从文件中读取时发送给我的执行程序。

下面是堆栈跟踪:

java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
    at java.util.ArrayList$Itr.next(ArrayList.java:831)
    at consumer.ril.com.LocalThreadPoolExecutor.run(LocalThreadPoolExecutor.java:37)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

当我尝试读取 batchArray 时,我在我的 LocalThreadPoolExecutor class 中遇到了异常,它是在此 class 的构造函数中传递的。

您的对象 batchArray 是通过引用传递的,有关详细信息,请参阅: Is Java "pass-by-reference" or "pass-by-value"?

因此,您的 class LocalThreadPoolExecutor 仍然对其有引用,无法修改。

使用 clone 或设置参数 final 都应该有效。

简单的解决方案——您需要将数组副本传递给 LocalThreadPoolExecutor 并清理原始数组。

Future<?> future = executor.submit(new LocalThreadPoolExecutor(new ArrayList<>
(batchArray), closeableHttpClient, httpPost));
futures.add(future);
batchArray.clear();

一些代码在 LocalThreadPoolExecutor 内部使用列表(迭代器)。在某些时候,它意识到列表已被修改(清除)。您应该使用列表的另一个副本。

由于您不需要主线程中的项目,您可以为每个批次显式创建新列表并将其传递给处理器:

类似于:

{   
...
     while ((batch = getNextBatch(bufferedReader, 100)).size() > 0) {
        futures.add(
            executor.submit(new LocalThreadPoolExecutor(batch, closeableHttpClient, httpPost))
        );
    }
...
}

获取下一批:

List<String> getNextBatch(BufferedReader bReader, int batchSize) throws IOException {
    List<String> batch = new ArrayList<>(batchSize);
    while (batch.size() < batchSize && (bLine = bReader.readLine()) != null) {
        batch.add(bLine);
    }
    return batch;
}