Runnable 的 ExecutorService,处理 Batches ArrayList 未完成处理

ExecutorService of Runnable, processing Batches of ArrayList not finishing processing

编辑:谢谢马克,对于那些有类似问题的人,我的问题是我首先制作了可运行的线程实例 class,然后将线程提交给执行服务。
它帮助我弄清楚,实际上,当我使用 ExecutorService 时,如果有未捕获的异常;它不会通知你,它会取消这个过程,没有通知。这就是我处理不完整的原因。

我有一个对象的 ArrayList,我想对其进行多线程批量处理,但在给定时间限制线程数 运行。我发现 ExecutorService 可以处理这个问题。但是在测试它是否正在处理每条记录时,它似乎只处理了我传递给它的一小部分对象。

编辑:我删除了它的多线程部分,并在不使用执行程序服务的情况下像平常一样处理对象,在小批量(仅 710)上,它工作正常;线程是否有可能完成得太快并且处理不正确?这通常意味着一次处理大约 300k-800k 条记录;这就是为什么我想对其进行多线程处理。

public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
    int cores = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newFixedThreadPool(cores);
    int batchSize = Settings.LOGIC_BATCH_SIZE;//100
    int batches = (int) Math.ceil((double) records.size() / (double) batchSize);

    ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
    LogicProcessor newHandler = null;
    for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
        if (records.size() < batchSize) {
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
        } else {
            int bound = (startIndex + batchSize);
            if (bound > records.size()) {
                bound = records.size();
            }
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
        }
        Thread newThread = new Thread(newHandler);
        Future<?> f = executor.submit(newThread);
        threads.add(f);
    }
    executor.shutdown();
    int completedThreads = 0;
    while (!executor.isTerminated()) {//monitors threads and waits until completion
        completedThreads = 0;
        for (Future<?> f : threads) {
            if (f.isDone()) {
                completedThreads++;
            }
        }
        //currentProgress = completedThreads;
    }

    for (ContainerRecord record : records) {//checks if each record has been processed
        System.out.println(record.getContainer() + ":" + record.isTouched());
    }
}

这是 LogicProcessor class 它启动

的线程实例
    private List<? extends ContainerRecord> archive;
private GUI mainGUI;

public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
    this.mainGUI = mainGUI;
    this.archive = records;
}

@Override
public void run() {
    handleLogic();
}

private void handleLogic() {
    Iterator iterator = archive.iterator();
    while (iterator.hasNext()) {
        ContainerRecord record = (ContainerRecord) iterator.next();
        record.touch();//sets a boolean in the object to validate if it has been processed yet.
    }
}

输出:在处理的 710 条记录(对象)中,691 条从未processed/touched,只有 19 条处理过。

这是怎么回事?我已经尝试了很多事情,甚至制作了一个 class LogicProcessor 数组并将实例保留在数组中以避免任何类型的 GC 删除实例。我不确定为什么它不处理这些记录。

我现在没有电脑可以 运行 测试,但是通过查看您的代码,所以我的回答是基于个人经验,甚至不能看作是代码审查,因为缺乏代码清晰度是错误来源:)

  1. 不要提交new Thread到执行器服务。执行者的重点是对用户隐藏线程这个词。相反,您的 LogicProcessor 应该实现 Runnable/Callable 接口,具体取决于您是否想要 return 该值。

  2. 再检查一下分批的逻辑。如果您使用番石榴,它已经实现了分区逻辑。参见 this tutorial。我承认这更多是个人喜好,您的代码可能也不错,我还没有深入检查。

  3. 关机方法和期货处理可能会得到简化。

调用shutdown方法会导致执行器服务停止接受要执行的新任务,但它不会立即关闭servicec,而是等到它已经拥有的所有任务都被执行。通常这样的线程池是在应用程序生命周期开始时创建的,并且只要应用程序 运行s 就一直存在。创建池非常昂贵,因为它会分配线程。

如果您想保持池打开但确保所有任务都已完成,您可以像您一样使用循环迭代期货。

所以我真的没有理由同时使用两者。如果您分配池只是为了提交一堆任务 - 调用 shutdown 就足够了。否则,您可以使用循环并将池视为全局对象,并在其他地方调用 shutdown,正如我上面所解释的那样。