如何让我的多线程 server/client 聊天程序向所有使用套接字的客户端回显消息?

How do I get my multithreaded server/client chat program to echo messages to all clients using sockets?

现在我有一个 java 程序,它使用线程和套接字来像真正的聊天一样回显文本响应 window。目前,我的程序由 运行 服务器和我想要的客户端运行。当客户端输入一条消息时,该消息会回显到服务器以及发送该消息的客户端。

我的问题是我希望任何客户端输入的消息不仅发送给服务器和他们自己,还发送给所有其他客户端。

这是目前的运作方式:

服务器:

收到客户端消息:test1

客户 1:

输入消息:test1

测试1

客户 2:

输入消息:

客户端1进入test1,接收回test1,服务端也接收到test1。客户 2 什么也得不到。我的目标是让客户端中输入的任何消息都显示在发送消息的客户端以及其他客户端和服务器上。

工作示例:

服务器:

收到客户端消息:test1

收到客户端消息:你好

客户 1:

输入消息:test1

测试1

来自客户 2:你好

客户 2:

输入消息:

来自客户端 1:test1

你好

格式不必完全像那样,但就是这样。到目前为止,我的代码如下。我读到我需要将我的客户添加到列表中,然后遍历他们并向他们发送所有消息,但我不确定。任何帮助都会很棒。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Scanner;

public class EchoMultiThreadClient {

    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 4000)) {
            
            //socket.setSoTimeout(5000);
            BufferedReader br = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            PrintWriter pw = new PrintWriter(socket.getOutputStream(), true);

            Scanner scanner = new Scanner(System.in);
            String echoString;
            String response;

            do {
                System.out.println("Enter string to be echoed: ");
                echoString = scanner.nextLine();

                pw.println(echoString);
                if(!echoString.equals("exit")) {
                    response = br.readLine();
                    System.out.println(response);
                }                          
                
                
            } while(!echoString.equals("exit"));
            
       // }catch(SocketTimeoutException e) {
        //  System.out.println("The Socket has been timed out");

        } catch (IOException e) {
            System.out.println("Client Error: " + e.getMessage());

        }
    }
    
}   

服务器代码

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
import java.util.Vector;

    public class EchoMultiThreadServer {
        private static Vector<Echoer> clients = new Vector<Echoer>();
        public static void main(String [] args) {
            try(ServerSocket serverSocket = new ServerSocket(4000)){
                while(true) {                                           
                        
                         Socket socket = serverSocket.accept();
                         Echoer echoer = new Echoer(socket);
                         echoer.start(); 
                         clients.add(echoer);

                        } 
                    }catch(IOException e) {
                        System.out.println("Server Exception"+e.getMessage());
                }
                
        }
}

线程代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Echoer extends Thread{
    
    private Socket socket;
    public Echoer(Socket socket) {
        this.socket = socket;
    }
    
    @Override
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter wr = new PrintWriter(socket.getOutputStream(), true);
            
            while(true) {
                
                String echoString = in.readLine();
                System.out.println("Received Client Input: " + echoString);
                if(echoString.equals("exit")) {
                    break;
                }

                wr.println(echoString);
            }
        }catch(IOException e) {
            System.out.println("Oooops " + e.getMessage());
        }finally {
            try {
                socket.close();
            }catch(IOException e) {
                // later
            }
            
        }
        
    }

}

我可以看出您当前的逻辑有两个问题:

  1. 在客户端,您实际上是在读取用户输入,然后发送到服务器并获得(单个)响应。所以这里的问题是你只会得到一个响应,而你应该为每个用户输入行接受多个响应:即用户的输入加上其他用户的输入。由于您不知道其他用户的输入时间和数量,因此您需要异步。我的意思是您需要 2 个线程:一个用于读取用户输入,另一个用于读取服务器 input/response(注意:我们仍在客户端)。由于您已经拥有 2 个线程之一,即运行 main 方法的线程,因此您可以使用它而不是创建一个新线程。
  2. 在服务器端,您的 Echoer 正在读取用户输入,但仅将其发送回同一客户端。例如,您还需要一个循环来将客户端的输入发送给所有其他客户端。

所以在我看来正确的逻辑是:

客户端:

正在读取服务器的响应线程逻辑:

forever, do:
    get server's message.
    print server's message to user.

main方法:

connect to server.
start a "Reading server's responses thread".
get user input.
while the user's input it not "exit", do:
    send user's input to server.
    get user input.
disconnect from server.

服务器端:

Echoer 话题:

forever, do:
    read client's message.
    for every client, do:
        send the message to the client.

main方法:

start server socket.
forever, do:
    accept incoming connection.
    start an Echoer thread for the accepted connection.

虽然有一些缺失,例如如何维护所有客户端的列表,但为此我可以看到您已经在服务器端使用 Vector<Echoer> clients。因此,只需将 Vector 传递给您创建的每个 Echoer,这样他们就可以广播每条传入消息。 此处重要说明:在服务器端,您有多个线程:主线程和每个 Echoer,因此请确保在 Vector 上同步您正在主线程修改它,同时也在 Echoers.

广播

备注:

  1. 我假设在上述所有逻辑中,客户端发送消息没有特定的顺序。例如,如果始终客户端 A 先发送,然后客户端 B 等等,并且整个过程在重复,那么你就不需要完全使用多线程。
  2. 请慢慢来。先实现它,然后如果你遇到任何问题告诉我。

编辑 1:完整示例代码。

客户代码:

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class Client {
    
    //This is the "Reading server's responses thread" I am talking about in the answer.
    private static class ReadingRunnable implements Runnable {
        
        private final BufferedReader serverInput;
        
        public ReadingRunnable(final InputStream is) {
            serverInput = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
        }
        
        @Override
        public void run() {
            try {
                //While the server is not disconnected, we print each line to 'System.out':
                for (String line = serverInput.readLine(); line != null; line = serverInput.readLine())
                    System.out.println(line);
            }
            catch (final IOException iox) {
                iox.printStackTrace(System.out);
            }
            finally {
                System.out.println("Input from server stopped.");
            }
        }
    }
    
    public static void main(final String[] args) {
        try {
            System.out.print("Connecting... ");
            try (final Socket sck = new Socket("localhost", 50505);
                 final OutputStream os = sck.getOutputStream();
                 final InputStream is = sck.getInputStream()) {
                System.out.println("Connected.");
                new Thread(new ReadingRunnable(is)).start();
                final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
                final Scanner scan = new Scanner(System.in);
                for (String userInput = scan.nextLine(); !"exit".equalsIgnoreCase(userInput); userInput = scan.nextLine()) {
                    bw.write(userInput);
                    bw.newLine();
                    bw.flush();
                }
            }
        }
        catch (final IOException iox) {
            iox.printStackTrace(System.out);
        }
        finally {
            System.out.println("Output from user stopped.");
        }
    }
}

服务器代码:

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Objects;

public class Server {
    
    private static class Echoer implements Runnable {
        private final ArrayList<Echoer> all;
        private final BufferedWriter bw;
        private final BufferedReader br;
        
        public Echoer(final ArrayList<Echoer> all,
                      final InputStream is,
                      final OutputStream os) {
            this.all = Objects.requireNonNull(all);
            bw = new BufferedWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
            br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
        }
        
        //Instead of exposing 'bw' via a getter, I just built a helper method to send a message to the Echoer:
        public void send(final String msg) throws IOException {
            bw.write(msg);
            bw.newLine();
            bw.flush();
        }
        
        @Override
        public void run() {
            try {
                for (String line = br.readLine(); line != null; line = br.readLine()) {
                    System.out.println(line); //Print the received line at the server.
                    synchronized (all) { //We are reading from a collection which may be modified at the same time by another (the main) Thread, so we need to synchronize.
                        //Broadcast the received line:
                        for (int i = all.size() - 1; i >= 0; --i) {
                            try {
                                all.get(i).send(line);
                            }
                            catch (final IOException iox) {
                                all.remove(i); //In case we cannot send to the client, disconnect him, ie remove him from the list in this simple case.
                            }
                        }
                    }
                }
            }
            catch (final IOException iox) {
            }
            finally {
                synchronized (all) {
                    all.remove(this); //Disconnect him, ie remove him from the list in this simple case.
                }
                System.out.println("Client disconnected.");
            }
        }
    }
    
    public static void main(final String[] args) throws IOException {
        System.out.print("Starting... ");
        try (final ServerSocket srv = new ServerSocket(50505)) {
            final ArrayList<Echoer> all = new ArrayList<>();
            System.out.println("Waiting for clients...");
            while (true) {
                final Socket sck = srv.accept();
                try {
                    final OutputStream os = sck.getOutputStream();
                    final InputStream is = sck.getInputStream();
                    final Echoer e = new Echoer(all, is, os); //Pass all the Echoers at the new one.
                    synchronized (all) { //We will write to a collection which may be accessed at the same time by another (an Echoer) Thread, so we need to synchronize.
                        all.add(e); //Update list of Echoers.
                    }
                    new Thread(e).start(); //Start serving Echoer.
                }
                catch (final IOException iox) {
                    System.out.println("Failed to open streams for a client.");
                }
            }
        }
    }
}