如何从客户端向服务器发送多个异步请求

How to send multiple of asynchronous requests from client to server

我已经使用套接字构建了简单的客户端-服务器模型。服务器收到 1 种类型的请求:来自客户端的 2 个数字,将它们相加,等待 5 秒并将响应发送回客户端。 我正在尝试从客户端 发送 20 个异步请求而不 等待响应。 客户端应将来自服务器的所有 20 个响应中的所有数字相加。我想了解我应该使用什么以及如何使用?服务器上的线程,或者客户端又如何? 我已经添加了我的客户端和服务器 类。 服务器:

public  class Server {
    public static void main(String[] args) throws IOException {
        try {
            //Make a ServerSocket to listen for message
            ServerSocket ss = new ServerSocket(7777);

            while (true == true) {
                //Accept input from socket
                Socket s = ss.accept();

                //Read input from socket
                InputStreamReader streamReader = new InputStreamReader(s.getInputStream());
                BufferedReader reader = new BufferedReader(streamReader);
                String message = reader.readLine();
                System.out.println(message);
                //parse the json recieved from client and sum the 2 numbers
                Object obj = new JSONParser().parse(String.valueOf(message));
                JSONObject jo = (JSONObject) obj;
                long num1 = (long) jo.get("num1");
                long num2 = (long) jo.get("num2");
                long sum = num1 + num2;
                Thread.sleep(5000);

                //putting response as json
                JSONObject jsonResponse = new JSONObject();
                jsonResponse.put("response", sum);
                //get the message and write it to the socket as response
                PrintWriter writer = new PrintWriter(s.getOutputStream());
                writer.println(jsonResponse);
                //System.out.println(df);
                writer.close();
            }
        } catch (IOException | ParseException | InterruptedException ex) {
            System.out.println(ex);
        }
    }
}

客户:

public  class Client {
    public static void main(String[] args) throws IOException {
        try {
            //this variable will sum all the responses from server
            long sumOfAllResponses = 0;
            
            for(int i = 0 ; i< 20; i++){
                //Create a Socket with ip and port number
                Socket s = new Socket("localhost", 7777);
                Scanner in = new Scanner(System.in);
                PrintWriter writer = new PrintWriter(s.getOutputStream());
                InputStreamReader streamReader = new InputStreamReader(s.getInputStream());
                BufferedReader reader = new BufferedReader(streamReader);
                //Creating json with 2 numbers to be sent to server as a request
                JSONObject jsonRequest = new JSONObject();
                jsonRequest.put("num1", 1);
                jsonRequest.put("num2", 1);
                System.out.println(jsonRequest);
                //Make a printWriter and write the message to the socket
                writer.println(jsonRequest);
                writer.flush();
                //Get the response message from server
                String responseMessage = reader.readLine();
                //parse the response and add the result to the sum variable
                Object obj = new JSONParser().parse(String.valueOf(responseMessage));
                JSONObject jo = (JSONObject) obj;
                sumOfAllResponses += (long) jo.get("response");
            }
            System.out.println(sumOfAllResponses);
        }
        catch (IOException | ParseException ex) {
            ex.printStackTrace(); // (**)
        }
    }
}

服务器端

服务器不应在其主线程上处理请求。 相反,它应该为它收到的每个请求打开处理线程。

我们应该限制并发 运行 线程的数量。

这是简单的代码示例:

public class Server {
    private static final int NUMBER_OF_CONCURRENT_THREADS = 20;
    private ServerSocket serverSocket;
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_CONCURRENT_THREADS);

    public void start(int port) throws IOException {
        serverSocket = new ServerSocket(port);
        while (true) {

            final Socket clientSocket = serverSocket.accept();
            final ClientHandler clientHandler = new ClientHandler(clientSocket);
            executor.submit(clientHandler);
        }
    }

    public void stop() throws IOException {
        serverSocket.close();
    }

    private static class ClientHandler implements Runnable {
        private Socket clientSocket;

        public ClientHandler(Socket socket) {
            this.clientSocket = socket;
        }

        public void run() {
            // Implement your client logic here
        }
    }
}

客户端

现在您可以从客户端并发发送请求,它们将由服务并发处理(一次最多 20 个请求)

我会把并发请求发送的实现留给你,但这里有一些指导方针:

  • 你可以在它自己的线程中发送每个请求,使用 ExecutorService 来处理这些线程
  • 对于来自服务的每个响应,添加到保存所有响应总和的共享变量。
  • 确保同步访问共享总和,因为多个线程将并行更新它。

异步意味着发送消息而不等待响应。

            //Get the response message from server - so you wait whole 5 seconds for response :)
            String responseMessage = reader.readLine();

在这种情况下,最简单的解决方案是每次都取消等待响应。因此,从 Client class.

中的循环中删除上面的行

在这种特殊的 client-sever 情况下,您不需要额外的线程,如果您要在一个应用程序中执行异步操作,那么就可以了。查看 Java Futures with some tutorial 了解如何使用它们。 但是如果你想从服务器得到一个结果,你无论如何都要等待。并且您想获得所有计算的结果。因此,您必须将所有传入请求存储在某个地方。简单、幼稚、不切实际,但表现出异步概念的代码可能是这样的

public class Client {
  public static void main(String[] args) throws IOException {
    try {
        long start = System.currentTimeMillis();
        BufferedReader reader = null;
        for(int i = 0 ; i < 20; i++){
            Socket s = new Socket("localhost", 7777);
            PrintWriter writer = new PrintWriter(s.getOutputStream());
            InputStreamReader streamReader = new InputStreamReader(s.getInputStream());
            reader = new BufferedReader(streamReader);
            // just make sure you send data and do not wait for response
            System.out.println("Sending " + i  + " : " + (System.currentTimeMillis() - start)); 
            writer.println(i);
            writer.flush();
        }
        //this line works like future.get(), it hangs client until it receives result
        String responseMessage = reader.readLine();
        // process returned data as you want
        System.out.println(responseMessage);
    }
    catch (IOException ex) {
        ex.printStackTrace(); // (**)
    }
  }
}

public class Server {
  public static void main(String[] args) throws IOException {
    try {
        //Make a ServerSocket to listen for message
        ServerSocket ss = new ServerSocket(7777);
        Socket s;
        //we need to store incomming requests to process, and return them
        List<String> integerList = new LinkedList<>();
        while (true) {
            s = ss.accept();

            InputStreamReader streamReader = new InputStreamReader(s.getInputStream());
            BufferedReader reader = new BufferedReader(streamReader);
            String message = reader.readLine();
            System.out.println(message);
            // do something here
            
            Thread.sleep(5000);

            PrintWriter writer = new PrintWriter(s.getOutputStream());
            integerList.add(message);
            writer.println(integerList);
            writer.close();
        }
    } catch (IOException | InterruptedException ex) {
        System.out.println(ex);
    }
  }
}