多个 ExecutorService 在 main 之后完成

Multiple ExecutorService finish after main

我需要您提供一些有关我正在执行 POC(概念验证)的方案的意见。 我是 java 中多线程的新手,正在尝试进行一些测试。我的要求是我想使用简单的方式加载数百万条记录 java 然后在对数据进行一些转换后将插入数据库 table。 为此,我想执行一个与完成所有任务相关的简单测试。

目前我想尝试仅在执行程序服务完成后才完成我的主要方法。下面是我试过的代码。 任何人都可以帮助我知道这是否是完成执行程序线程后完成主要方法的正确方法。

非常感谢您的建议。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            executor = Executors.newFixedThreadPool(5);
           // Runnable worker = new WorkerThread("Thread executor :" + i);
            executor.execute(new WorkerThread("Thread executor :" + i));
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
            //System.out.println("Waiting");
        }

        System.out.println("Will start for Executor 1");

        ExecutorService executor1 = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
           // Runnable worker = new WorkerThread("Thread executor1 :" + i);
            executor1.execute(new WorkerThread("Thread executor1 :" + i));
          }
        executor1.shutdown();
        while (!executor1.isTerminated()) {
            //System.out.println("Waiting");
        }

        System.out.println("Finished all threads");
        //
        String s=null;
        s.toUpperCase();
    }
}

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class WorkerThread implements Runnable {

    private String command;

    public WorkerThread(String s){
        this.command=s;
    }


    public void run() {
        ExecutorService executor2 = Executors.newFixedThreadPool(5);

        Future loadData=executor2.submit(new LoadData());

        System.out.println(" Start. Command = "+command);

        try {
            List listOfData=(List) loadData.get();

            for(int i=0;i<listOfData.size();i++){
                //Thread.sleep(500);
                //System.out.println("Printing the value from list:"+listOfData.get(i));

                ConversionLogic conversion= new ConversionLogic();
                conversion.doConversion(command);
            }



        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println(" End."+command);

    }



    public String toString(){
        return this.command;
    }
}

class LoadData implements Callable{

    public List call() throws Exception {

        List<String> dataList= new ArrayList<String>();

        for(int i=0;i<100;i++){
            String data="Data_"+i;
            //System.out.println("Data Added in List:"+data);
            dataList.add(data);
        }
        Thread.sleep(10000);

        return dataList;
    }

}



public class ConversionLogic {

    public void doConversion(String threadName){

        try {

            System.out.println("Converting Data for Thread starts:"+threadName);
            Thread.sleep(5000);
            System.out.println("Converting Data for Thread ends:"+threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

所以这是我从下面提供的答案中了解到的:

package Whosebug.test;

import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(executor2);


        //start loading data
        int procCount = 0;
        for (int i = 0; i < 10; i++) {
            Future loadData = processor.submit(new LoadData("Org"));
            procCount++;
        }

        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED for Org");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("Fund Data Loading Starts:");
        //___________________________________________________________________//

        // Some argument to get Fund Data
        int procCount1 = 0;
        for (int i = 0; i < 5; i++) {
            Future loadData = processor2.submit(new LoadData("Fund"));
            procCount1++;
        }

        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed for Fund:");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter1 = new ExecutorCompletionService<Void>(executor2);

        while (procCount1-- > 0) {
            Future<List> listOfDataFuture = processor2.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed for Fund:");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter1.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED for Org");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor2.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }


              System.out.println("<<<<<<<<<< End>>>>>>>>");
              System.exit(0);



    }
}


package Whosebug.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

class LoadData implements Callable {
    String dataType;

    public List call() throws Exception {

        List<String> dataList = new ArrayList<String>();

        for (int i = 0; i < 20; i++) {
            String data = "Data_" + i;
           System.out.println("Processing Data of Type :" + dataType + "Data is:"+data);
            dataList.add(data);
        }
        Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type){
        this.dataType=type;
    }

}

package Whosebug.test;

import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

    private String threadName;

    public ConversionLogic(String threadName) {

        this.threadName = threadName;
    }

    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}

欢迎更新一组 requirement.Any 建议的代码以提高性能。


 package Whosebug.tesst;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;

import connection.java.JdbcConnection;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(
                executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(
                executor2);

        System.out.println("Connecting to DB...");
        try {
            System.out.println("Connection is :"
                    + JdbcConnection.getConnection());
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        // start loading data

        int priceRange1 = 200;
        int priceRange2 = priceRange1 + 200;

        int procCount = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + priceRange1;
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + priceRange2
                        + " and price > " + priceRange1;
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            }
            priceRange1 = priceRange2;
            priceRange2 = priceRange2 + 200;

            procCount++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Org");

        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(
                executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount
                        + "_" + i, listOfData);
                converter.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Org:");

        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Org");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("<<<<<<<<<< End>>>>>>>>");
        System.exit(0);

    }
}

package Whosebug.tesst;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

----------------------------------------------
import connection.java.JdbcConnection;

class LoadData implements Callable {
    String dataType;
    Connection conn;
    String query;

    public List call() throws Exception {
        List<Product> dataList = new ArrayList<Product>();
        try {
             conn=JdbcConnection.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(this.query);
            while(rs.next()){
                System.out.println(rs.getString("code"));
                System.out.println(rs.getString("name"));
                System.out.println(rs.getInt("price"));
                Product p= new Product(rs.getString("code"),rs.getString("name"),rs.getInt("price"));
                dataList.add(p);
            }
            rs.close();//conn.close();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

      Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type,String query){
        this.dataType=type;
        this.query=query;
    }

}


    }
}

---------------------------
package Whosebug.tesst;

import java.util.List;
import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

    private String threadName;
    List<Product> productList;

    public ConversionLogic(String threadName,List<Product> productList) {

        this.threadName = threadName;
        this.productList=productList;
    }

    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            int listSize=productList.size();
            for(int i=0;i<listSize;i++){
                //Do conversion for product let say
                System.out.println("Print product in Conversion:"+productList.get(i).getPrice());
            }
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}
------------------------------------
package connection.java;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class JdbcConnection {

    static Connection conn;
    static String user;
    static String pass;
    static String dbURL;

    public static Connection getConnection() throws ClassNotFoundException,
            SQLException {
        Class.forName("org.postgresql.Driver");

        dbURL = "jdbc:postgresql://localhost:5433:postgres";
        user = "postgres";
        pass = "root";
        Connection conn = DriverManager.getConnection(dbURL, user, pass);
        Statement stmt = conn.createStatement();
        System.out.println("Created DB Connection....");
        return conn;

    }

}

package Whosebug.tesst;

import java.io.Serializable;
import java.math.BigDecimal;



public class Product implements Serializable {

    /**
     * Product Class using Annotation
     */
    private static final long serialVersionUID = 1L;
    private Integer id;
    private String code;
    private String name;
    private int price;

    Product(String code,String name,int price){
        this.code=code;
        this.name=name;
        this.price=price;
    }

   public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }


    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }


    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }


    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", code=" + code + ", name="
                + name + ", price=" + price + "]";
    } 

}

这是我正在寻找的 POC 的最终版本。唯一需要的建议是我是否应该同步插入查询部分?

package Whosebug.tesst;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;

import connection.java.JdbcConnection;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(
                executor);

        ExecutorService executor2 = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(
                executor2);

        /*System.out.println("Connecting to DB...");
        try {
            System.out.println("Connection is :"
                    + JdbcConnection.getConnection());
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }*/

        // start loading data

        int priceRange1 = 200;
        int priceRange2 = priceRange1 + 200;

        int procCount = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + priceRange1 + " order by price";
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + priceRange2
                        + " and price > " + priceRange1 + " order by price";
                Future loadData = processor.submit(new LoadData("Org",
                        finalQuery));
            }
            priceRange1 = priceRange2;
            priceRange2 = priceRange2 + 200;

            procCount++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Org");

        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(
                executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic("<<Org>>"
                        + "_" + i, listOfData);
                converter.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Org:");

        executor.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Org");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }

        System.out.println("Fund Data Loading Starts:");
        // ___________________________________________________________________//


        int fundRange1 = 200;
        int fundRange2 = fundRange1 + 200;


        int procCount1 = 0;
        for (int i = 0; i < 10; i++) {

            String query = "select code,name,price from Product where price ";

            if (i == 0) {
                String finalQuery = query + " <= " + fundRange1;
                Future loadData = processor2.submit(new LoadData("Fund",
                        finalQuery));
            } else {
                String finalQuery = query + " <= " + fundRange2
                        + " and price > " + fundRange1;
                Future loadData = processor2.submit(new LoadData("Fund",
                        finalQuery));
            }
            fundRange1 = fundRange2;
            fundRange2 = fundRange2 + 200;

            procCount1++;
        }

        System.out.println("All LOADING tasks have been COMPLETED for Fund");

        ExecutorCompletionService<Void> converter1 = new ExecutorCompletionService<Void>(
                executor2);

        while (procCount1-- > 0) {
            Future<List> listOfDataFuture = processor2.take();
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic("<<Fund>>"
                        + "_" + i, listOfData);
                converter1.submit(conversion);
            }
        }

        System.out
                .println("All CONVERSION task have been submitted and are being executed for Fund:");

        executor2.shutdown();
        try {
            System.out.println("Waiting for finish");
            executor.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("Stopped nicely for Fund");
        } catch (InterruptedException e) {
            System.out.println("Could not stop in alloted time");
        }



        System.out.println("<<<<<<<<<< End>>>>>>>>");
        System.exit(0);

    }
}


package Whosebug.tesst;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import connection.java.JdbcConnection;

class LoadData implements Callable {
    String dataType;
    Connection conn;
    String query;

    public List call() throws Exception {
        List<Product> dataList = new ArrayList<Product>();
        try {
            System.out.println("Connection establishing for Loading Org Data:");
            conn = JdbcConnection.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(this.query);
            while (rs.next()) {
                System.out.println(rs.getString("code"));
                System.out.println(rs.getString("name"));
                System.out.println(rs.getInt("price"));
                Product p = new Product(rs.getString("code"),
                        rs.getString("name"), rs.getInt("price"));
                dataList.add(p);
            }
            rs.close();
            conn.close();
            System.out.println("Connection Closed While loading for :"+dataType);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        Thread.sleep(2000);

        return dataList;
    }

    LoadData(String type, String query) {
        this.dataType = type;
        this.query = query;
    }

}


package Whosebug.tesst;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.Callable;

import connection.java.JdbcConnection;

class ConversionLogic implements Callable {

    private String threadName;
    List<Product> productList;
    Connection conn;

    public ConversionLogic(String threadName, List<Product> productList) {

        this.threadName = threadName;
        this.productList = productList;
    }

    public Void call() throws Exception {
        int listSize = productList.size();
        try {

            conn = JdbcConnection.getConnection();
            System.out.println("Connection establishing for Converting Org Data:");
            String insertTableSQL = "INSERT INTO item"
                    + "(code, name, price) VALUES" + "(?,?,?)";

            PreparedStatement preparedStatement = conn
                    .prepareStatement(insertTableSQL);


            for (int i = 0; i < listSize; i++) {

                preparedStatement.setString(1, productList.get(i)
                        .getCode());
                preparedStatement.setString(2, productList.get(i)
                        .getName());
                preparedStatement.setInt(3, productList.get(i)
                        .getPrice());
                //Guess we suppose to synchronize the insert part in case 
                // In case mutiple threads trying to insert some records in a table and we might end up loosing data
                //
                preparedStatement.executeUpdate();


            }


            preparedStatement.close();
            conn.close();
            System.out.println("Connection Closed While Converting for Org :");
        } catch (ClassNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        return null;
    }
}

我建议您使用内置的 await 函数来代替 while 循环:

    executor.shutdown();
    try {
        System.out.println("Waiting for finish");
        executor.awaitTermination(1000, TimeUnit.SECONDS);
        System.out.println("Stopped nicely");
    } catch (InterruptedException e) {
        System.out.println("Could not stop in alloted time");
    }

    System.exit(0);

将这段代码放在任何你想要确保你的 executorService 在你继续之前完成的地方。

您的代码中有很多线程池的方法,一般来说,在生成的线程中生成新线程不是一个好主意:它很容易失控。在您的情况下,您不需要 WorkerThread:您已经拥有 Java 框架 (ExecutorService) 提供的线程池。

由于您需要处理线程 (LoadData) 的结果 (ConversionLogic),我还会使用 ExecutorCompletionService 来帮助从 [=12= 收集结果].

遵循重构代码。我已经放弃了 WorkerThread 并只使用了一个线程池(尽管如果你想要更多的并行性你可以使用两个),而且 ConversionLogic 现在实现了 Callable 以便它可以很容易地被线程池处理。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);


        //start loading data
        int procCount = 0;
        for (int i = 0; i < 10; i++) {
            Future loadData = processor.submit(new LoadData());
            procCount++;
        }
        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor.shutdown();
        System.out.println(" End.");


    }
}

class LoadData implements Callable {

    public List call() throws Exception {

        List<String> dataList = new ArrayList<String>();

        for (int i = 0; i < 20; i++) {
            String data = "Data_" + i;
            System.out.println("Data Added in List:" + data);
            dataList.add(data);
        }
        Thread.sleep(2000);

        return dataList;
    }

}


class ConversionLogic implements Callable {

    private String threadName;

    public ConversionLogic(String threadName) {

        this.threadName = threadName;
    }

    @Override
    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}

executor.awaitTermination() 方法听起来很方便,但这不是一个有效的方法,因为如果您的所有执行程序服务完成 before/after 1000 秒(或您在 awaitTermination( ) 方法)。要么你在浪费时间,要么不让你的服务完成他们的任务。 相反,我建议您使用 CountDownLatch,如下例所述。

public class app {

public static void main(String[] args) throws InterruptedException {

    CountDownLatch latch = new CountDownLatch(3); // here assuming you know number of threads
    ExecutorService executor = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 3; i++) {
        executor.submit(new processor(latch));
    }
    latch.await();
    System.out.println("main resume");

}

}


class processor extends Thread{
    private CountDownLatch latch;

    processor(CountDownLatch latch){
        this.latch=latch;
    }

    public void run(){
        System.out.println("in thread");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        latch.countDown();
    }
}

因此,您不会浪费一毫秒的时间。