如何对我的顺序 Java 代码进行多线程处理
How to multi-thread my sequential Java code
我有一个 Java 程序,它给定一个列表,对每个列表项执行一些 独立的 过程(包括从一些 HTTP 资源中检索文本并将它们插入到一个独立的 HashMap),最后在这些 HashMap 上计算一些数字。主要片段如下所示:
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
process1(index, m.name, startepoch, m.duration);
//adds to hashmap1
if(m.name.equals("TEST")) {
process2(index, m.name, startepoch, m.duration);
//adds to hashmap2
} else {
process3(index, m.name, startepoch, m.duration);
//adds to hashmap3
process4(index, m.name, startepoch, m.duration);
//adds to hashmap4
process5(index, m.name, startepoch, m.duration);
//adds to hashmap5
process6(index, m.name, startepoch, m.duration);
//adds to hashmap6
}
}
// then start calculation on all hashmaps
calculate_all();
由于目前此代码段是按顺序执行的,因此对于包含 500 个项目的列表,这可能需要 30 分钟左右的时间。我怎样才能对我的代码进行多线程处理以使其更快?并以线程安全的方式?
我尝试使用 ExecutorService executorService = Executors.newFixedThreadPool(10);
,然后通过如下方式包装将每个进程提交给 executorService
,但问题是我不知道他们什么时候完成,所以要调用 calculate_all()
。所以我没有继续。
executorService.submit(new Runnable() {
public void run() {
process2(index, m.name, startepoch, m.duration);
}
});
有更好的工作思路吗?
but the problem was I couldn't know when they finish
当您向 Executor 提交内容时,您会返回一个包含结果(如果有)的 Future
。
然后您可以从您的主线程调用 Future::get
以等待这些结果(或者只是您的情况的完成)。
List<Future<?>> completions = executor.invokeAll(tasks);
// later, when you need to wait for completion
for(Future<?> c: completions) c.get();
您需要注意的另一件事是如何存储结果。如果您计划让您的任务将它们放入某个共享数据结构中,请确保该任务是线程安全的。从 Runnable
更改为 Callable
可能更容易,以便任务可以 return 结果(稍后您可以在主线程上以单线程方式合并)。
请注意,多线程并不一定能提高速度。多线程主要用于通过防止不必要的睡眠等来减少空闲 CPU 周期。
对于您提供的内容,我无能为力,但是,我认为您可以先做这样的事情:
- 使用线程安全的数据结构。这是必须的。如果你错过了这个
步骤,你的软件最终会崩溃。你会有一个
很难找到原因。 (例如,如果你有一个 ArrayList,
使用线程安全的)
- 您可以通过删除 for 循环开始尝试多线程
并为每次执行使用一个线程。如果你的 for 循环
大小超过你的线程数,你将拥有
将它们排入队列。
- 您有一个最终计算需要所有其他线程
结束。您可以使用 CountDownLatch、wait()/notifyAll() 或
synchronized() 取决于你的实现。
- 执行你的最终计算。
编辑
回应 (2):
您当前的执行是这样的:
for (int i = 0; i < mylist.size(); i++) {
some_processes();
}
// then start calculation on all hashmaps
calculate_all();
现在,要删除 "for" 循环,您可以先从增加 "for" 循环开始。例如:
// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
如您所见,此方法有几个缺点。例如,如果列表大小变为 380 怎么办?然后你有一个空闲线程。另外,如果你想要超过 5 个线程怎么办?
所以在这一点上,您可以通过让循环越来越少来进一步增加 "for" 循环的数量。最多,"for loop count" == "thread count",有效地删除了你的 for 循环。所以从技术上讲,您需要 "mylist.size()" 个线程。您可以这样实现:
// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());
new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
请注意,通过这种安排,我们已经删除了您的初始 "for" 循环,而是创建了另一个循环,该循环仅在队列为空时提交新线程。您可以使用生产者和消费者应用程序检查 BlockingQueue 示例。例如参见:BlockingQueue examples
编辑 2
Future
的简单实现可能如下所示:
ExecutorService executorService = Executors.newCachedThreadPool();
Future future1, future2, future3, future4, future5, future6;
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
future1 = executorService.submit(new Callable() {...})
//adds to hashmap1
future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
if(m.name.equals("TEST")) {
future2 = executorService.submit(new Callable() {...})
//adds to hashmap2
future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
} else {
future3 = executorService.submit(new Callable() {...})
//adds to hashmap3
future4 = executorService.submit(new Callable() {...})
//adds to hashmap4
future5 = executorService.submit(new Callable() {...})
//adds to hashmap5
future6 = executorService.submit(new Callable() {...})
//adds to hashmap6
// Add extra future.get here as above...
}
}
// then start calculation on all hashmaps
calculate_all();
不要忘记添加 try-catch 块,否则您可能无法从异常中恢复并崩溃。
// Example try-catch block surrounding a Future.get().
try {
Object result = future.get();
} catch (ExecutionException e) {
//Do something
} catch (InterruptedException e) {
//Do something
}
但是,您可以有一个更复杂的,如图 here。 link 也解释了 Thilo 的回答。
我有一个 Java 程序,它给定一个列表,对每个列表项执行一些 独立的 过程(包括从一些 HTTP 资源中检索文本并将它们插入到一个独立的 HashMap),最后在这些 HashMap 上计算一些数字。主要片段如下所示:
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
process1(index, m.name, startepoch, m.duration);
//adds to hashmap1
if(m.name.equals("TEST")) {
process2(index, m.name, startepoch, m.duration);
//adds to hashmap2
} else {
process3(index, m.name, startepoch, m.duration);
//adds to hashmap3
process4(index, m.name, startepoch, m.duration);
//adds to hashmap4
process5(index, m.name, startepoch, m.duration);
//adds to hashmap5
process6(index, m.name, startepoch, m.duration);
//adds to hashmap6
}
}
// then start calculation on all hashmaps
calculate_all();
由于目前此代码段是按顺序执行的,因此对于包含 500 个项目的列表,这可能需要 30 分钟左右的时间。我怎样才能对我的代码进行多线程处理以使其更快?并以线程安全的方式?
我尝试使用 ExecutorService executorService = Executors.newFixedThreadPool(10);
,然后通过如下方式包装将每个进程提交给 executorService
,但问题是我不知道他们什么时候完成,所以要调用 calculate_all()
。所以我没有继续。
executorService.submit(new Runnable() {
public void run() {
process2(index, m.name, startepoch, m.duration);
}
});
有更好的工作思路吗?
but the problem was I couldn't know when they finish
当您向 Executor 提交内容时,您会返回一个包含结果(如果有)的 Future
。
然后您可以从您的主线程调用 Future::get
以等待这些结果(或者只是您的情况的完成)。
List<Future<?>> completions = executor.invokeAll(tasks);
// later, when you need to wait for completion
for(Future<?> c: completions) c.get();
您需要注意的另一件事是如何存储结果。如果您计划让您的任务将它们放入某个共享数据结构中,请确保该任务是线程安全的。从 Runnable
更改为 Callable
可能更容易,以便任务可以 return 结果(稍后您可以在主线程上以单线程方式合并)。
请注意,多线程并不一定能提高速度。多线程主要用于通过防止不必要的睡眠等来减少空闲 CPU 周期。
对于您提供的内容,我无能为力,但是,我认为您可以先做这样的事情:
- 使用线程安全的数据结构。这是必须的。如果你错过了这个 步骤,你的软件最终会崩溃。你会有一个 很难找到原因。 (例如,如果你有一个 ArrayList, 使用线程安全的)
- 您可以通过删除 for 循环开始尝试多线程 并为每次执行使用一个线程。如果你的 for 循环 大小超过你的线程数,你将拥有 将它们排入队列。
- 您有一个最终计算需要所有其他线程 结束。您可以使用 CountDownLatch、wait()/notifyAll() 或 synchronized() 取决于你的实现。
- 执行你的最终计算。
编辑
回应 (2):
您当前的执行是这样的:
for (int i = 0; i < mylist.size(); i++) {
some_processes();
}
// then start calculation on all hashmaps
calculate_all();
现在,要删除 "for" 循环,您可以先从增加 "for" 循环开始。例如:
// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
如您所见,此方法有几个缺点。例如,如果列表大小变为 380 怎么办?然后你有一个空闲线程。另外,如果你想要超过 5 个线程怎么办?
所以在这一点上,您可以通过让循环越来越少来进一步增加 "for" 循环的数量。最多,"for loop count" == "thread count",有效地删除了你的 for 循环。所以从技术上讲,您需要 "mylist.size()" 个线程。您可以这样实现:
// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());
new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
请注意,通过这种安排,我们已经删除了您的初始 "for" 循环,而是创建了另一个循环,该循环仅在队列为空时提交新线程。您可以使用生产者和消费者应用程序检查 BlockingQueue 示例。例如参见:BlockingQueue examples
编辑 2
Future
的简单实现可能如下所示:
ExecutorService executorService = Executors.newCachedThreadPool();
Future future1, future2, future3, future4, future5, future6;
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
future1 = executorService.submit(new Callable() {...})
//adds to hashmap1
future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
if(m.name.equals("TEST")) {
future2 = executorService.submit(new Callable() {...})
//adds to hashmap2
future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
} else {
future3 = executorService.submit(new Callable() {...})
//adds to hashmap3
future4 = executorService.submit(new Callable() {...})
//adds to hashmap4
future5 = executorService.submit(new Callable() {...})
//adds to hashmap5
future6 = executorService.submit(new Callable() {...})
//adds to hashmap6
// Add extra future.get here as above...
}
}
// then start calculation on all hashmaps
calculate_all();
不要忘记添加 try-catch 块,否则您可能无法从异常中恢复并崩溃。
// Example try-catch block surrounding a Future.get().
try {
Object result = future.get();
} catch (ExecutionException e) {
//Do something
} catch (InterruptedException e) {
//Do something
}
但是,您可以有一个更复杂的,如图 here。 link 也解释了 Thilo 的回答。