从不同的线程将值添加到全局集中并等待所有线程完成
add values into global set from different threads and wait for all the threads to finish
我是线程的新手,想知道是否可以使用线程将大数据拆分成小任务,从而减少处理时间
我已将 Set 拆分为多个集合列表,执行程序服务中的每个线程占用该集合并将该集合添加到另一个集合(全局声明)abcSet。
我需要每个线程将对象添加到这个集合中,并且在所有线程完成添加之后,继续必须使用 abcSet 完成的其余工作
下面是示例代码。请帮忙!!
private static final int PARTITIONS_COUNT = 4;
final Set<Abc> newAbcSet = new HashSet<Abc>();
final Set<Abc> abcSet = //data from database
ExecutorService e = Executors.newFixedThreadPool(4);
List<Set<Abc>> theSets = new ArrayList<Set<Abc>>(PARTITIONS_COUNT);
// divide set into 4 different sets for threading purpose
for (int i = 0; i < PARTITIONS_COUNT; i++) {
theSets.add(new HashSet<Abc>());
}
int index = 0;
for (Abc abcObj : abcSet) {
theSets.get(index++ % PARTITIONS_COUNT).add(abcObj);
}
for (final Set<Abc> abcSet1 : theSets) {
e.execute(new Runnable() {
@Override
public void run() {
for (Abc abc : abcSet1) {
//do some modifications with abc and add it to newAbcSet
newAbcSet.add(abc);
}
}
});
}
//Do something with the newAbcSet
在for
后添加while (!e.isTerminated()){}
停止执行:
for (final Set<Abc> abcSet1 : theSets) {
e.execute(new Runnable() {
@Override
public void run() {
for(Abc abc: abcSet1){
//do some modifications with abc and add it to newAbcSet
newAbcSet.add(abc);
}
}
});
}
while (!e.isTerminated()){}
为了防止竞争,你应该使用thread safe Set
implemention,例如:
final Set<Abc> newAbcSet= Collections.synchronizedSet(new HashSet<Abc>());
如果不需要线程池,直接调用shutdown()
and awaitTermination()
e.shutDown(); // previously submitted tasks are not affected
e.awaitTermination(); // Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
//Do something with the newAbcSet
如果还需要线程池,参考这个question。
我是线程的新手,想知道是否可以使用线程将大数据拆分成小任务,从而减少处理时间 我已将 Set 拆分为多个集合列表,执行程序服务中的每个线程占用该集合并将该集合添加到另一个集合(全局声明)abcSet。 我需要每个线程将对象添加到这个集合中,并且在所有线程完成添加之后,继续必须使用 abcSet 完成的其余工作 下面是示例代码。请帮忙!!
private static final int PARTITIONS_COUNT = 4;
final Set<Abc> newAbcSet = new HashSet<Abc>();
final Set<Abc> abcSet = //data from database
ExecutorService e = Executors.newFixedThreadPool(4);
List<Set<Abc>> theSets = new ArrayList<Set<Abc>>(PARTITIONS_COUNT);
// divide set into 4 different sets for threading purpose
for (int i = 0; i < PARTITIONS_COUNT; i++) {
theSets.add(new HashSet<Abc>());
}
int index = 0;
for (Abc abcObj : abcSet) {
theSets.get(index++ % PARTITIONS_COUNT).add(abcObj);
}
for (final Set<Abc> abcSet1 : theSets) {
e.execute(new Runnable() {
@Override
public void run() {
for (Abc abc : abcSet1) {
//do some modifications with abc and add it to newAbcSet
newAbcSet.add(abc);
}
}
});
}
//Do something with the newAbcSet
在for
后添加while (!e.isTerminated()){}
停止执行:
for (final Set<Abc> abcSet1 : theSets) {
e.execute(new Runnable() {
@Override
public void run() {
for(Abc abc: abcSet1){
//do some modifications with abc and add it to newAbcSet
newAbcSet.add(abc);
}
}
});
}
while (!e.isTerminated()){}
为了防止竞争,你应该使用thread safe
Set
implemention,例如:final Set<Abc> newAbcSet= Collections.synchronizedSet(new HashSet<Abc>());
如果不需要线程池,直接调用
shutdown()
andawaitTermination()
e.shutDown(); // previously submitted tasks are not affected e.awaitTermination(); // Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first. //Do something with the newAbcSet
如果还需要线程池,参考这个question。