Java 多线程连接问题

Java multiple thread join issue

所以我需要使用线程(已经拆分)处理几个数据文件,我遇到了如何停止主线程直到所有子线程完成的问题。 我环顾四周并尝试使用 join() 但这会导致问题:


        public class Matrix extends MapReduce {
        ArrayList<String> VecteurLines = new ArrayList<String>();
        protected int[] nbrLnCol = {0,0};
        protected static double[] res;

        public Matrix(String n) {
            super(n);
        }
        public Matrix(String n,String m){
            super(n,m);
        }
    public void Reduce() throws IOException, InterruptedException, MatrixException {

            for (int i = 1; i <= Chunks; i++) {

                Thread t=new Thread(new RunThread(VecteurLines,i,this));
                t.start();

            }
        }

这里是处理线程的class


    public class RunThread extends Matrix implements Runnable {
            Matrix ma;
            ArrayList<String> vec;
            int threadNbr;


            public RunThread(ArrayList<String> vec, int threadNbr,Matrix ma)  {
                super("","");
                this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }

            @Override
            public void run() {

                FileInputStream fin = null;
                try {
                    fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
                Scanner sc = new Scanner(fin);


                while (sc.hasNext()) {
                    String nextString = sc.next();

                    ma.nbrLnCol[0]++;
                    String [] arr = nextString.split(",");
                    ma.nbrLnCol[1]=arr.length;
                    double c=0;
                    for(int j=0;j<arr.length;j++)
                    {
                        c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));

                    }

                    res[threadNbr-1]=c;
                }
                sc.close();
                try {
                    fin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                File file = new File(ma.getNom()+threadNbr+".txt");
                file.delete();
            }

这样试试:

 private List<Thread> threadList = new ArrayList<>();

 public void Reduce() {
     threadList.clear();
     for (int i = 1; i <= Chunks; i++) {
         Thread t  =new Thread(new RunThread(VecteurLines,i,this));
         threadList.add(t);
     }

     // start all worker threads
     for(int i=0; i<threadList.size(); i++){
         threadList.get(i).start();
     }

     // wait until all worker threads is finished
     while (true) {
         int threadIsNotLive = 0;
         for (int i = 0; i < threadList.size(); i++) {
             Thread t = threadList.get(i);
             if (!t.isAlive() || t == null) {
                 ++threadIsNotLive;
             }
         }
         if(threadIsNotLive>0 && (threadList.size() == threadIsNotLive)){
             break;
             // all worker threads is finished
         }
         else {
             Thread.sleep(50);
             // wait until all worker threads is finished
         }
     }
 }

 public void Reduce() {
     List<Thread> threadList = new ArrayList<>();
     for (int i = 1; i <= Chunks; i++) {
         Thread t  =new Thread(new RunThread(VecteurLines,i,this));
         threadList.add(t);
     }

     // start all worker threads
     for(int i=0; i<threadList.size(); i++){
         threadList.get(i).start();
         threadList.get(i).join();
     }
 }

我相信您的代码需要两点: 你的主线程必须在所有线程执行后最后结束,因为你说

"how to stop the main thread till all the subthreads finish"

。 其次,线程应该一个接一个地完成,即第二个线程应该在第一个线程之后完成,如您所说

"the second needs the first to finish first."

这是我使用 join 执行此操作的代码。

public class Matrix extends MapReduce {
    ArrayList<String> VecteurLines = new ArrayList<String>();
    protected int[] nbrLnCol = {0,0};
    protected static double[] res;

    public Matrix(String n) {
        super(n);
    }
    public Matrix(String n,String m){
        super(n,m);
    }
public void Reduce() throws IOException, InterruptedException, MatrixException {
    Thread t = null;
        for (int i = 1; i <= Chunks; i++) {

            Thread t=new Thread(new RunThread(t,VecteurLines,i,this));
            t.start();

        }
      t.join(); // finally main thread joining with the last thread.
    }

public class RunThread extends Matrix implements Runnable {
        Matrix ma;
        ArrayList<String> vec;
        int threadNbr;
        Thread t;


        public RunThread(t,ArrayList<String> vec, int threadNbr,Matrix ma)  {
            this.t = t;
            super("","");
            this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }

        @Override
        public void run() {                
            FileInputStream fin = null;
            try {
                fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            Scanner sc = new Scanner(fin);


            while (sc.hasNext()) {
                String nextString = sc.next();

                ma.nbrLnCol[0]++;
                String [] arr = nextString.split(",");
                ma.nbrLnCol[1]=arr.length;
                double c=0;
                for(int j=0;j<arr.length;j++)
                {
                    c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));

                }

                res[threadNbr-1]=c;
            }
            sc.close();
            try {
                fin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

            File file = new File(ma.getNom()+threadNbr+".txt");
            file.delete();
            if(t!=null){
             t.join(); //join with the previous thread eg. thread2 joining with thread1
            }
        }