Runnable 的 ExecutorService,处理 Batches ArrayList 未完成处理
ExecutorService of Runnable, processing Batches of ArrayList not finishing processing
编辑:谢谢马克,对于那些有类似问题的人,我的问题是我首先制作了可运行的线程实例 class,然后将线程提交给执行服务。
它帮助我弄清楚,实际上,当我使用 ExecutorService 时,如果有未捕获的异常;它不会通知你,它会取消这个过程,没有通知。这就是我处理不完整的原因。
我有一个对象的 ArrayList,我想对其进行多线程批量处理,但在给定时间限制线程数 运行。我发现 ExecutorService 可以处理这个问题。但是在测试它是否正在处理每条记录时,它似乎只处理了我传递给它的一小部分对象。
编辑:我删除了它的多线程部分,并在不使用执行程序服务的情况下像平常一样处理对象,在小批量(仅 710)上,它工作正常;线程是否有可能完成得太快并且处理不正确?这通常意味着一次处理大约 300k-800k 条记录;这就是为什么我想对其进行多线程处理。
public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(cores);
int batchSize = Settings.LOGIC_BATCH_SIZE;//100
int batches = (int) Math.ceil((double) records.size() / (double) batchSize);
ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
LogicProcessor newHandler = null;
for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
if (records.size() < batchSize) {
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
} else {
int bound = (startIndex + batchSize);
if (bound > records.size()) {
bound = records.size();
}
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
}
Thread newThread = new Thread(newHandler);
Future<?> f = executor.submit(newThread);
threads.add(f);
}
executor.shutdown();
int completedThreads = 0;
while (!executor.isTerminated()) {//monitors threads and waits until completion
completedThreads = 0;
for (Future<?> f : threads) {
if (f.isDone()) {
completedThreads++;
}
}
//currentProgress = completedThreads;
}
for (ContainerRecord record : records) {//checks if each record has been processed
System.out.println(record.getContainer() + ":" + record.isTouched());
}
}
这是 LogicProcessor class 它启动
的线程实例
private List<? extends ContainerRecord> archive;
private GUI mainGUI;
public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
this.mainGUI = mainGUI;
this.archive = records;
}
@Override
public void run() {
handleLogic();
}
private void handleLogic() {
Iterator iterator = archive.iterator();
while (iterator.hasNext()) {
ContainerRecord record = (ContainerRecord) iterator.next();
record.touch();//sets a boolean in the object to validate if it has been processed yet.
}
}
输出:在处理的 710 条记录(对象)中,691 条从未processed/touched,只有 19 条处理过。
这是怎么回事?我已经尝试了很多事情,甚至制作了一个 class LogicProcessor 数组并将实例保留在数组中以避免任何类型的 GC 删除实例。我不确定为什么它不处理这些记录。
我现在没有电脑可以 运行 测试,但是通过查看您的代码,所以我的回答是基于个人经验,甚至不能看作是代码审查,因为缺乏代码清晰度是错误来源:)
不要提交new Thread
到执行器服务。执行者的重点是对用户隐藏线程这个词。相反,您的 LogicProcessor
应该实现 Runnable/Callable 接口,具体取决于您是否想要 return 该值。
再检查一下分批的逻辑。如果您使用番石榴,它已经实现了分区逻辑。参见 this tutorial。我承认这更多是个人喜好,您的代码可能也不错,我还没有深入检查。
关机方法和期货处理可能会得到简化。
调用shutdown
方法会导致执行器服务停止接受要执行的新任务,但它不会立即关闭servicec,而是等到它已经拥有的所有任务都被执行。通常这样的线程池是在应用程序生命周期开始时创建的,并且只要应用程序 运行s 就一直存在。创建池非常昂贵,因为它会分配线程。
如果您想保持池打开但确保所有任务都已完成,您可以像您一样使用循环迭代期货。
所以我真的没有理由同时使用两者。如果您分配池只是为了提交一堆任务 - 调用 shutdown
就足够了。否则,您可以使用循环并将池视为全局对象,并在其他地方调用 shutdown
,正如我上面所解释的那样。
编辑:谢谢马克,对于那些有类似问题的人,我的问题是我首先制作了可运行的线程实例 class,然后将线程提交给执行服务。
它帮助我弄清楚,实际上,当我使用 ExecutorService 时,如果有未捕获的异常;它不会通知你,它会取消这个过程,没有通知。这就是我处理不完整的原因。
我有一个对象的 ArrayList,我想对其进行多线程批量处理,但在给定时间限制线程数 运行。我发现 ExecutorService 可以处理这个问题。但是在测试它是否正在处理每条记录时,它似乎只处理了我传递给它的一小部分对象。
编辑:我删除了它的多线程部分,并在不使用执行程序服务的情况下像平常一样处理对象,在小批量(仅 710)上,它工作正常;线程是否有可能完成得太快并且处理不正确?这通常意味着一次处理大约 300k-800k 条记录;这就是为什么我想对其进行多线程处理。
public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(cores);
int batchSize = Settings.LOGIC_BATCH_SIZE;//100
int batches = (int) Math.ceil((double) records.size() / (double) batchSize);
ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
LogicProcessor newHandler = null;
for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
if (records.size() < batchSize) {
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
} else {
int bound = (startIndex + batchSize);
if (bound > records.size()) {
bound = records.size();
}
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
}
Thread newThread = new Thread(newHandler);
Future<?> f = executor.submit(newThread);
threads.add(f);
}
executor.shutdown();
int completedThreads = 0;
while (!executor.isTerminated()) {//monitors threads and waits until completion
completedThreads = 0;
for (Future<?> f : threads) {
if (f.isDone()) {
completedThreads++;
}
}
//currentProgress = completedThreads;
}
for (ContainerRecord record : records) {//checks if each record has been processed
System.out.println(record.getContainer() + ":" + record.isTouched());
}
}
这是 LogicProcessor class 它启动
的线程实例 private List<? extends ContainerRecord> archive;
private GUI mainGUI;
public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
this.mainGUI = mainGUI;
this.archive = records;
}
@Override
public void run() {
handleLogic();
}
private void handleLogic() {
Iterator iterator = archive.iterator();
while (iterator.hasNext()) {
ContainerRecord record = (ContainerRecord) iterator.next();
record.touch();//sets a boolean in the object to validate if it has been processed yet.
}
}
输出:在处理的 710 条记录(对象)中,691 条从未processed/touched,只有 19 条处理过。
这是怎么回事?我已经尝试了很多事情,甚至制作了一个 class LogicProcessor 数组并将实例保留在数组中以避免任何类型的 GC 删除实例。我不确定为什么它不处理这些记录。
我现在没有电脑可以 运行 测试,但是通过查看您的代码,所以我的回答是基于个人经验,甚至不能看作是代码审查,因为缺乏代码清晰度是错误来源:)
不要提交
new Thread
到执行器服务。执行者的重点是对用户隐藏线程这个词。相反,您的LogicProcessor
应该实现 Runnable/Callable 接口,具体取决于您是否想要 return 该值。再检查一下分批的逻辑。如果您使用番石榴,它已经实现了分区逻辑。参见 this tutorial。我承认这更多是个人喜好,您的代码可能也不错,我还没有深入检查。
关机方法和期货处理可能会得到简化。
调用shutdown
方法会导致执行器服务停止接受要执行的新任务,但它不会立即关闭servicec,而是等到它已经拥有的所有任务都被执行。通常这样的线程池是在应用程序生命周期开始时创建的,并且只要应用程序 运行s 就一直存在。创建池非常昂贵,因为它会分配线程。
如果您想保持池打开但确保所有任务都已完成,您可以像您一样使用循环迭代期货。
所以我真的没有理由同时使用两者。如果您分配池只是为了提交一堆任务 - 调用 shutdown
就足够了。否则,您可以使用循环并将池视为全局对象,并在其他地方调用 shutdown
,正如我上面所解释的那样。