Java 如何在并行处理中处理对象列表
How to process a list of objects in parallel processing in Java
我在 Java 中有一个对象列表,就像列表中的数千个对象一样,我正在为每个对象迭代列表并进行进一步处理。每个对象都在进行相同的处理。这种 sequentail 方法需要花费很多时间来处理,所以我想在 Java 中实现并行处理。我检查了 Java 中的执行器框架,但我被困在里面了。
我想到了一种方法来实现我的要求。
我想实现一些固定数量的最小对象,每个线程将处理这些对象,以便每个线程快速完成其工作和处理对象。我怎样才能做到这一点?或者如果有任何其他方法可以实现我的要求,请分享。
例如:
列表对象 = new List();
For(对象对象:对象){
//对所有人做一些共同的操作
对象
}
有很多并行处理列表的选项:
使用并行流:
objects.stream().parallel().forEach(object -> {
//Your work on each object goes here, using object
})
如果你想使用比 fork-join 池多线程的池,请使用执行器服务提交任务:
ExecutorService es = Executors.newFixedThreadPool(10);
for(Object o: objects) {
es.submit(() -> {
//code here using Object o...
}
}
前面的示例与传统的执行器服务基本相同,运行 任务在不同的线程上。
作为替代方法,您还可以使用可完成的将来提交:
//You can also just run a for-each and manually add each
//feature to a list
List<CompletableFuture<Void>> futures =
objects.stream().map(object -> CompletableFuture.runAsync(() -> {
//Your work on each object goes here, using object
})
然后,如果需要,您可以使用 futures
对象来检查每次执行的状态。
您可以使用 ThreadPoolExecutor
,它将负责负载平衡。任务将分布在不同的线程上。
这是一个例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// Fixed thread number
ExecutorService service = Executors.newFixedThreadPool(10);
// Or un fixed thread number
// The number of threads will increase with tasks
// ExecutorService service = Executors.newCachedThreadPool(10);
List<Object> objects = new ArrayList<>();
for (Object o : objects) {
service.execute(new MyTask(o));
}
// shutdown
// this will get blocked until all task finish
service.shutdown();
try {
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class MyTask implements Runnable {
Object target;
public MyTask(Object target) {
this.target = target;
}
@Override
public void run() {
// business logic at here
}
}
}
Split list into multiple sub-lists and use multi threading to process
each sub-lists parallel.
public class ParallelProcessListElements {
public void processList (int numberofthreads,List<Object>tempList,
Object obj, Method method){
final int sizeofList=tempList.size();
final int sizeofsublist = sizeofList/numberofthreads;
List<Thread> threadlist = new ArrayList<Thread>();
for(int i=0;i<numberofthreads;i++) {
int firstindex = i*sizeofsublist;
int lastindex = i*sizeofsublist+sizeofsublist;
if(i==numberofthreads-1)
lastindex=sizeofList;
List<Object> subList=tempList.subList(firstindex,lastindex );
Thread th = new Thread(()->{
try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();}
});
threadlist.add(th);
}
threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}});
}
}
public class Demo {
public static void main(String[] args) {
List<Object> tempList= new ArrayList<Object>();
/**
* Adding values to list... For Demo purpose..
*/
for(int i=0;i<500;i++)
tempList.add(i);
ParallelProcessListElements process = new ParallelProcessListElements();
final int numberofthreads = 5;
Object obj = new Demo();
Method method=null;
try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {}
/**
* Method Call...
*/
process.processList(numberofthreads,tempList,obj,method);
}
public void printList(List<Integer>list) {
/**
* Business logic to process the list...
*/
list.forEach(item->{
try{Thread.sleep(1000);}catch(Exception e) {}
System.out.println(item);
});
}
}
我在 Java 中有一个对象列表,就像列表中的数千个对象一样,我正在为每个对象迭代列表并进行进一步处理。每个对象都在进行相同的处理。这种 sequentail 方法需要花费很多时间来处理,所以我想在 Java 中实现并行处理。我检查了 Java 中的执行器框架,但我被困在里面了。
我想到了一种方法来实现我的要求。
我想实现一些固定数量的最小对象,每个线程将处理这些对象,以便每个线程快速完成其工作和处理对象。我怎样才能做到这一点?或者如果有任何其他方法可以实现我的要求,请分享。
例如:
列表对象 = new List();
For(对象对象:对象){ //对所有人做一些共同的操作 对象
}
有很多并行处理列表的选项:
使用并行流:
objects.stream().parallel().forEach(object -> {
//Your work on each object goes here, using object
})
如果你想使用比 fork-join 池多线程的池,请使用执行器服务提交任务:
ExecutorService es = Executors.newFixedThreadPool(10);
for(Object o: objects) {
es.submit(() -> {
//code here using Object o...
}
}
前面的示例与传统的执行器服务基本相同,运行 任务在不同的线程上。
作为替代方法,您还可以使用可完成的将来提交:
//You can also just run a for-each and manually add each
//feature to a list
List<CompletableFuture<Void>> futures =
objects.stream().map(object -> CompletableFuture.runAsync(() -> {
//Your work on each object goes here, using object
})
然后,如果需要,您可以使用 futures
对象来检查每次执行的状态。
您可以使用 ThreadPoolExecutor
,它将负责负载平衡。任务将分布在不同的线程上。
这是一个例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// Fixed thread number
ExecutorService service = Executors.newFixedThreadPool(10);
// Or un fixed thread number
// The number of threads will increase with tasks
// ExecutorService service = Executors.newCachedThreadPool(10);
List<Object> objects = new ArrayList<>();
for (Object o : objects) {
service.execute(new MyTask(o));
}
// shutdown
// this will get blocked until all task finish
service.shutdown();
try {
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class MyTask implements Runnable {
Object target;
public MyTask(Object target) {
this.target = target;
}
@Override
public void run() {
// business logic at here
}
}
}
Split list into multiple sub-lists and use multi threading to process each sub-lists parallel.
public class ParallelProcessListElements {
public void processList (int numberofthreads,List<Object>tempList,
Object obj, Method method){
final int sizeofList=tempList.size();
final int sizeofsublist = sizeofList/numberofthreads;
List<Thread> threadlist = new ArrayList<Thread>();
for(int i=0;i<numberofthreads;i++) {
int firstindex = i*sizeofsublist;
int lastindex = i*sizeofsublist+sizeofsublist;
if(i==numberofthreads-1)
lastindex=sizeofList;
List<Object> subList=tempList.subList(firstindex,lastindex );
Thread th = new Thread(()->{
try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();}
});
threadlist.add(th);
}
threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}});
}
}
public class Demo {
public static void main(String[] args) {
List<Object> tempList= new ArrayList<Object>();
/**
* Adding values to list... For Demo purpose..
*/
for(int i=0;i<500;i++)
tempList.add(i);
ParallelProcessListElements process = new ParallelProcessListElements();
final int numberofthreads = 5;
Object obj = new Demo();
Method method=null;
try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {}
/**
* Method Call...
*/
process.processList(numberofthreads,tempList,obj,method);
}
public void printList(List<Integer>list) {
/**
* Business logic to process the list...
*/
list.forEach(item->{
try{Thread.sleep(1000);}catch(Exception e) {}
System.out.println(item);
});
}
}