Java 多线程连接问题
Java multiple thread join issue
所以我需要使用线程(已经拆分)处理几个数据文件,我遇到了如何停止主线程直到所有子线程完成的问题。
我环顾四周并尝试使用 join() 但这会导致问题:
- 如果我加入主线程和最后一个线程,那么由于其他线程 运行 同时,最后一个线程并不总是最后一个完成
- 如果我将主线程与所有其他线程一起加入,则它们不会同时 运行,第二个需要第一个先完成。
还尝试了 wait() 和 notify() 但有更多问题。这是我的代码的一部分
public class Matrix extends MapReduce {
ArrayList<String> VecteurLines = new ArrayList<String>();
protected int[] nbrLnCol = {0,0};
protected static double[] res;
public Matrix(String n) {
super(n);
}
public Matrix(String n,String m){
super(n,m);
}
public void Reduce() throws IOException, InterruptedException, MatrixException {
for (int i = 1; i <= Chunks; i++) {
Thread t=new Thread(new RunThread(VecteurLines,i,this));
t.start();
}
}
这里是处理线程的class
public class RunThread extends Matrix implements Runnable {
Matrix ma;
ArrayList<String> vec;
int threadNbr;
public RunThread(ArrayList<String> vec, int threadNbr,Matrix ma) {
super("","");
this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }
@Override
public void run() {
FileInputStream fin = null;
try {
fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
String nextString = sc.next();
ma.nbrLnCol[0]++;
String [] arr = nextString.split(",");
ma.nbrLnCol[1]=arr.length;
double c=0;
for(int j=0;j<arr.length;j++)
{
c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));
}
res[threadNbr-1]=c;
}
sc.close();
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
File file = new File(ma.getNom()+threadNbr+".txt");
file.delete();
}
这样试试:
private List<Thread> threadList = new ArrayList<>();
public void Reduce() {
threadList.clear();
for (int i = 1; i <= Chunks; i++) {
Thread t =new Thread(new RunThread(VecteurLines,i,this));
threadList.add(t);
}
// start all worker threads
for(int i=0; i<threadList.size(); i++){
threadList.get(i).start();
}
// wait until all worker threads is finished
while (true) {
int threadIsNotLive = 0;
for (int i = 0; i < threadList.size(); i++) {
Thread t = threadList.get(i);
if (!t.isAlive() || t == null) {
++threadIsNotLive;
}
}
if(threadIsNotLive>0 && (threadList.size() == threadIsNotLive)){
break;
// all worker threads is finished
}
else {
Thread.sleep(50);
// wait until all worker threads is finished
}
}
}
或
public void Reduce() {
List<Thread> threadList = new ArrayList<>();
for (int i = 1; i <= Chunks; i++) {
Thread t =new Thread(new RunThread(VecteurLines,i,this));
threadList.add(t);
}
// start all worker threads
for(int i=0; i<threadList.size(); i++){
threadList.get(i).start();
threadList.get(i).join();
}
}
我相信您的代码需要两点:
你的主线程必须在所有线程执行后最后结束,因为你说
"how to stop the main thread till all the subthreads finish"
。
其次,线程应该一个接一个地完成,即第二个线程应该在第一个线程之后完成,如您所说
"the second needs the first to finish first."
这是我使用 join 执行此操作的代码。
public class Matrix extends MapReduce {
ArrayList<String> VecteurLines = new ArrayList<String>();
protected int[] nbrLnCol = {0,0};
protected static double[] res;
public Matrix(String n) {
super(n);
}
public Matrix(String n,String m){
super(n,m);
}
public void Reduce() throws IOException, InterruptedException, MatrixException {
Thread t = null;
for (int i = 1; i <= Chunks; i++) {
Thread t=new Thread(new RunThread(t,VecteurLines,i,this));
t.start();
}
t.join(); // finally main thread joining with the last thread.
}
和
public class RunThread extends Matrix implements Runnable {
Matrix ma;
ArrayList<String> vec;
int threadNbr;
Thread t;
public RunThread(t,ArrayList<String> vec, int threadNbr,Matrix ma) {
this.t = t;
super("","");
this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }
@Override
public void run() {
FileInputStream fin = null;
try {
fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
String nextString = sc.next();
ma.nbrLnCol[0]++;
String [] arr = nextString.split(",");
ma.nbrLnCol[1]=arr.length;
double c=0;
for(int j=0;j<arr.length;j++)
{
c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));
}
res[threadNbr-1]=c;
}
sc.close();
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
File file = new File(ma.getNom()+threadNbr+".txt");
file.delete();
if(t!=null){
t.join(); //join with the previous thread eg. thread2 joining with thread1
}
}
所以我需要使用线程(已经拆分)处理几个数据文件,我遇到了如何停止主线程直到所有子线程完成的问题。 我环顾四周并尝试使用 join() 但这会导致问题:
- 如果我加入主线程和最后一个线程,那么由于其他线程 运行 同时,最后一个线程并不总是最后一个完成
- 如果我将主线程与所有其他线程一起加入,则它们不会同时 运行,第二个需要第一个先完成。 还尝试了 wait() 和 notify() 但有更多问题。这是我的代码的一部分
public class Matrix extends MapReduce {
ArrayList<String> VecteurLines = new ArrayList<String>();
protected int[] nbrLnCol = {0,0};
protected static double[] res;
public Matrix(String n) {
super(n);
}
public Matrix(String n,String m){
super(n,m);
}
public void Reduce() throws IOException, InterruptedException, MatrixException {
for (int i = 1; i <= Chunks; i++) {
Thread t=new Thread(new RunThread(VecteurLines,i,this));
t.start();
}
}
这里是处理线程的class
public class RunThread extends Matrix implements Runnable {
Matrix ma;
ArrayList<String> vec;
int threadNbr;
public RunThread(ArrayList<String> vec, int threadNbr,Matrix ma) {
super("","");
this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }
@Override
public void run() {
FileInputStream fin = null;
try {
fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
String nextString = sc.next();
ma.nbrLnCol[0]++;
String [] arr = nextString.split(",");
ma.nbrLnCol[1]=arr.length;
double c=0;
for(int j=0;j<arr.length;j++)
{
c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));
}
res[threadNbr-1]=c;
}
sc.close();
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
File file = new File(ma.getNom()+threadNbr+".txt");
file.delete();
}
这样试试:
private List<Thread> threadList = new ArrayList<>();
public void Reduce() {
threadList.clear();
for (int i = 1; i <= Chunks; i++) {
Thread t =new Thread(new RunThread(VecteurLines,i,this));
threadList.add(t);
}
// start all worker threads
for(int i=0; i<threadList.size(); i++){
threadList.get(i).start();
}
// wait until all worker threads is finished
while (true) {
int threadIsNotLive = 0;
for (int i = 0; i < threadList.size(); i++) {
Thread t = threadList.get(i);
if (!t.isAlive() || t == null) {
++threadIsNotLive;
}
}
if(threadIsNotLive>0 && (threadList.size() == threadIsNotLive)){
break;
// all worker threads is finished
}
else {
Thread.sleep(50);
// wait until all worker threads is finished
}
}
}
或
public void Reduce() {
List<Thread> threadList = new ArrayList<>();
for (int i = 1; i <= Chunks; i++) {
Thread t =new Thread(new RunThread(VecteurLines,i,this));
threadList.add(t);
}
// start all worker threads
for(int i=0; i<threadList.size(); i++){
threadList.get(i).start();
threadList.get(i).join();
}
}
我相信您的代码需要两点: 你的主线程必须在所有线程执行后最后结束,因为你说
"how to stop the main thread till all the subthreads finish"
。 其次,线程应该一个接一个地完成,即第二个线程应该在第一个线程之后完成,如您所说
"the second needs the first to finish first."
这是我使用 join 执行此操作的代码。
public class Matrix extends MapReduce {
ArrayList<String> VecteurLines = new ArrayList<String>();
protected int[] nbrLnCol = {0,0};
protected static double[] res;
public Matrix(String n) {
super(n);
}
public Matrix(String n,String m){
super(n,m);
}
public void Reduce() throws IOException, InterruptedException, MatrixException {
Thread t = null;
for (int i = 1; i <= Chunks; i++) {
Thread t=new Thread(new RunThread(t,VecteurLines,i,this));
t.start();
}
t.join(); // finally main thread joining with the last thread.
}
和
public class RunThread extends Matrix implements Runnable {
Matrix ma;
ArrayList<String> vec;
int threadNbr;
Thread t;
public RunThread(t,ArrayList<String> vec, int threadNbr,Matrix ma) {
this.t = t;
super("","");
this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }
@Override
public void run() {
FileInputStream fin = null;
try {
fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
String nextString = sc.next();
ma.nbrLnCol[0]++;
String [] arr = nextString.split(",");
ma.nbrLnCol[1]=arr.length;
double c=0;
for(int j=0;j<arr.length;j++)
{
c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));
}
res[threadNbr-1]=c;
}
sc.close();
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
File file = new File(ma.getNom()+threadNbr+".txt");
file.delete();
if(t!=null){
t.join(); //join with the previous thread eg. thread2 joining with thread1
}
}