线程池中的多个线程在同一个列表中写入数据
Multiple Threads in a thread pool writing data in same List
我的 运行 中有多个线程 threadPool
每个线程读取一个巨大的文件和 returns 列表中来自该文件的数据。
代码如下:
class Writer{
ArrayList finalListWhereDataWillBeWritten = new Array<Integer>()
for(query q : allQueries){ //all the read queries to read file
threadPool.submit(new GetDataFromFile(fileName,filePath));
}//all the read queries have been submitted.
}
现在我知道下面的代码部分将出现在我的代码中的某个位置,但我不知道将它放在哪里。
因为如果我将它放在 for 循环中的 submit()
之后,它不会添加它,因为每个文件都非常大,可能还没有完成处理。
synchronized(finalListWhereDataWillBeWritten){
//process the data obtained from single file and add it to target list
finalListWhereDataWillBeWritten.addAll(dataFromSingleThread);
}
所以谁能告诉我我应该把这段代码放在哪里,以及我需要确保哪些其他事情不会发生临界区问题。
class GetDataFromFile implements Runnable<List<Integer>>{
private String fileName;
private String filePath;
public List<Integer> run(){
//code for streaming the file fileName
return dataObtainedFromThisFile;
}
}
我是否需要在我的代码中使用 wait()
/ notifyAll()
方法,因为我只是在线程中并行读取文件中的数据并将它们放在共享列表中
更新 请考虑 Marko 提供的答案要好得多
如果要确保在处理列表之前所有线程都已完成,请执行以下操作:
import java.util.List;
import java.util.Vector;
public class ThreadWork {
public static void main(String[] args) {
int count = 5;
Thread[] threads = new ListThread[count];
List<String> masterList = new Vector<String>();
for(int index = 0; index < count; index++) {
threads[index] = new ListThread(masterList, "Thread " + (index + 1));
threads[index].start();
}
while(isOperationRunning(threads)) {
// do nothing
}
System.out.println("Done!! Print Your List ...");
for(String item : masterList){
System.out.println("[" + item + "]");
}
}
private static boolean isOperationRunning(Thread[] threads) {
boolean running = false;
for(Thread thread : threads) {
if(thread.isAlive()) {
running = true;
break;
}
}
return running;
}
}
class ListThread extends Thread {
private static String items[] = { "A", "B", "C", "D"};
private List<String> list;
private String name;
public ListThread(List<String> masterList, String threadName) {
list = masterList;
name = threadName;
}
public void run() {
for(int i = 0; i < items.length;++i) {
randomWait();
String data = "Thread [" + name + "][" + items[i] + "]";
System.out.println( data );
list.add( data );
}
}
private void randomWait() {
try {
Thread.currentThread();
Thread.sleep((long)(3000 * Math.random()));
}
catch (InterruptedException x) {}
}
}
与其重新发明轮子,不如简单地实现 Callable<List<Integer>>
并将其提交给 JDK 的标准执行程序服务。然后,随着期货的完成,您将结果收集到列表中。
final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final List<Future<List<Integer>>> futures = new ArrayList<>();
for(query q : allQueries) {
futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath)));
}
for (Future<List<Integer>> f : futures) {
finalListWhereDataWillBeWritten.addAll(f.get());
}
这一切都假设你低于 Java 8。对于 Java 8,你当然会使用并行流:
final List<Integer> finalListWhereDataWillBeWritten =
allQueries.parallelStream()
.flatMap(q -> getDataFromFile(q.fileName, q.filePath))
.collect(toList());
我的 运行 中有多个线程 threadPool
每个线程读取一个巨大的文件和 returns 列表中来自该文件的数据。
代码如下:
class Writer{
ArrayList finalListWhereDataWillBeWritten = new Array<Integer>()
for(query q : allQueries){ //all the read queries to read file
threadPool.submit(new GetDataFromFile(fileName,filePath));
}//all the read queries have been submitted.
}
现在我知道下面的代码部分将出现在我的代码中的某个位置,但我不知道将它放在哪里。
因为如果我将它放在 for 循环中的 submit()
之后,它不会添加它,因为每个文件都非常大,可能还没有完成处理。
synchronized(finalListWhereDataWillBeWritten){
//process the data obtained from single file and add it to target list
finalListWhereDataWillBeWritten.addAll(dataFromSingleThread);
}
所以谁能告诉我我应该把这段代码放在哪里,以及我需要确保哪些其他事情不会发生临界区问题。
class GetDataFromFile implements Runnable<List<Integer>>{
private String fileName;
private String filePath;
public List<Integer> run(){
//code for streaming the file fileName
return dataObtainedFromThisFile;
}
}
我是否需要在我的代码中使用 wait()
/ notifyAll()
方法,因为我只是在线程中并行读取文件中的数据并将它们放在共享列表中
更新 请考虑 Marko 提供的答案要好得多
如果要确保在处理列表之前所有线程都已完成,请执行以下操作:
import java.util.List;
import java.util.Vector;
public class ThreadWork {
public static void main(String[] args) {
int count = 5;
Thread[] threads = new ListThread[count];
List<String> masterList = new Vector<String>();
for(int index = 0; index < count; index++) {
threads[index] = new ListThread(masterList, "Thread " + (index + 1));
threads[index].start();
}
while(isOperationRunning(threads)) {
// do nothing
}
System.out.println("Done!! Print Your List ...");
for(String item : masterList){
System.out.println("[" + item + "]");
}
}
private static boolean isOperationRunning(Thread[] threads) {
boolean running = false;
for(Thread thread : threads) {
if(thread.isAlive()) {
running = true;
break;
}
}
return running;
}
}
class ListThread extends Thread {
private static String items[] = { "A", "B", "C", "D"};
private List<String> list;
private String name;
public ListThread(List<String> masterList, String threadName) {
list = masterList;
name = threadName;
}
public void run() {
for(int i = 0; i < items.length;++i) {
randomWait();
String data = "Thread [" + name + "][" + items[i] + "]";
System.out.println( data );
list.add( data );
}
}
private void randomWait() {
try {
Thread.currentThread();
Thread.sleep((long)(3000 * Math.random()));
}
catch (InterruptedException x) {}
}
}
与其重新发明轮子,不如简单地实现 Callable<List<Integer>>
并将其提交给 JDK 的标准执行程序服务。然后,随着期货的完成,您将结果收集到列表中。
final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final List<Future<List<Integer>>> futures = new ArrayList<>();
for(query q : allQueries) {
futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath)));
}
for (Future<List<Integer>> f : futures) {
finalListWhereDataWillBeWritten.addAll(f.get());
}
这一切都假设你低于 Java 8。对于 Java 8,你当然会使用并行流:
final List<Integer> finalListWhereDataWillBeWritten =
allQueries.parallelStream()
.flatMap(q -> getDataFromFile(q.fileName, q.filePath))
.collect(toList());