Java 套接字:我的服务器 class 不断接收输入,但我的客户端没有?

Java Socketing: My server class constantly takes in input but my client doesn't?

尝试在服务器和客户端之间进行一些并发消息传递。当他们第一次连接到彼此并且服务器发送测试字符串时,客户端第一次就可以很好地完成它。客户端可以向服务器发送消息。但是我的客户端 class 无法像我的服务器那样不断检查消息,并且不知道出了什么问题。有什么建议吗?

服务器class代码:

import java.lang.*;
import java.io.*;
import java.net.*;
import java.util.Random;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Server {

String testMessage = "You are now connected and can begin chatting!";
boolean connected = false;
int port;


public Server(int port) {
    this.port = port;
}

public void Open() {

    //creates Threadpool for multiple instances of chatting
    final ExecutorService clientProcessingPool = Executors.newFixedThreadPool(10);
    Runnable serverTask = new Runnable() {

        @Override
        public void run() {
            try {
                System.out.println("Opening...");

                ServerSocket srvr = new ServerSocket(port);
                while (true) {
                    Socket skt = srvr.accept();
                    clientProcessingPool.submit(new ClientTask(skt));
                }
            } catch (Exception e) {
                try {
                    System.out.println(e);
                    System.out.print("You're opening too many servers in the same location, fool!\n");
                    ServerSocket srvr = new ServerSocket(port);
                    while (true) {
                        Socket skt = srvr.accept();
                        clientProcessingPool.submit(new ClientTask(skt));
                    }
                } catch (IOException ex) {
                    Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    };
    Thread serverThread = new Thread(serverTask);
    serverThread.start();
}

private class ClientTask implements Runnable {

    private final Socket skt;

    private ClientTask(Socket skt) {
        this.skt = skt;
    }

    @Override
    public void run() {
        //for sending messages
        if (connected == false) {
            System.out.println("======================");
            System.out.println("Server has connected!");
            processMessage(testMessage);
        }
        //for receiving messages
        while (true) {
            try {
                // Read one line and output it
                BufferedReader br = new BufferedReader(new InputStreamReader(skt.getInputStream()));
                String incomingMessage = br.readLine();
                if (incomingMessage != null) {
                    System.out.println("Server: Received message: " + incomingMessage);
                    processMessage(incomingMessage);
                }
                //br.close();
                //skt.close(); //maybe delete
            } catch (Exception e) {
                System.out.println("Server had error receiving message.");
                System.out.println("Error: " + e);
            }
        }
    }

    //for processing a message once it is received
    public void processMessage(String message) {
        PrintWriter out = null;
        try {
            out = new PrintWriter(skt.getOutputStream(), true);
        } catch (IOException ex) {
            System.out.println(ex);
            System.out.println("Server had error sending message.");
        }
        System.out.print("Server: Sending message: " + message + "\n");
        out.print(message);
        out.flush();
        connected = true;
        try {
            skt.shutdownOutput();
            //out.close();
        } catch (IOException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        }

    }
}
}

客户class代码:

import java.lang.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JOptionPane;

class Client {

public String message;
Socket skt;
public int port;

public Client(int port) {
    this.port = port;
}

//for receiving messages from Server
public void receiveMessage() {
    final ExecutorService clientProcessingPool = Executors.newFixedThreadPool(10);
    Runnable serverTask = new Runnable() {
        @Override
        public void run() {
            try {
                skt = new Socket(InetAddress.getLocalHost().getHostName(), port);
                while (true) {

                    clientProcessingPool.submit(new Client.ClientTask(skt));
                }
            } catch (IOException ex) {
                Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    };
    Thread serverThread = new Thread(serverTask);
    serverThread.start();
}

//for sending messages to Server
public void sendMessage(String outgoingMessage) throws IOException {
    try {
        skt = new Socket(InetAddress.getLocalHost().getHostName(), port);
        PrintWriter pw = new PrintWriter(skt.getOutputStream());
        System.out.println("Client: Sending message: " + outgoingMessage);
        pw.print(outgoingMessage);
        pw.flush();
        skt.shutdownOutput();
        //skt.close(); //maybe delete
    } catch (Exception e) {
        System.out.println(e);
        System.out.print("Client had error sending message.\n");
        JOptionPane.showMessageDialog(null, "That User is not currently online.", "ERROR!!", JOptionPane.INFORMATION_MESSAGE);
    }

}

private class ClientTask implements Runnable {

    private final Socket skt;

    private ClientTask(Socket skt) {
        this.skt = skt;
    }

    @Override
    public void run() {
        while (true) {
            try {
                BufferedReader in = new BufferedReader(new InputStreamReader(skt.getInputStream()));
                //while (!in.ready()) {}
                String incomingMessage = in.readLine();
                if (incomingMessage != null) {
                    System.out.println("Client: Received message: " + incomingMessage); // Read one line and output it
                    message = incomingMessage;
                }
                //skt.shutdownInput();
                //in.close();
                //skt.close(); //maybe delete
            } catch (Exception e) {
                System.out.print("Client had error receiving message.\n");
            }
        }
    }
}
}

流无法重新包装。一旦分配给包装器,它们就必须在流的整个生命周期中使用该包装器。您也不应该在使用完流之前关闭它,在这种情况下,直到您的客户端和服务器完成通信。

在您当前的代码中,有几次您重新初始化流:

while (true) {
    try {
        //Each loop, this reader will attempt to re-wrap the input stream
        BufferedReader br = new BufferedReader(new InputStreamReader(skt.getInputStream()));

        String incomingMessage = br.readLine();
        if (incomingMessage != null) {
            System.out.println("Server: Received message: " + incomingMessage);
            processMessage(incomingMessage);
        }

        //don't close your stream and socket so early!
        br.close();
        skt.close();
    } catch (Exception e) {
        //...
    }

你明白了;您也可以使用这些知识来查找客户端代码中的流问题。

话虽如此,服务器是多个客户端之间的中间人。如果您希望能够在服务器的控制台中键入以向客户端发送消息,则它不应该只发送给 1 个客户端(除非您有一个允许您指定名称的系统)。您需要将每个连接存储在某种集合中,这样当您在服务器的控制台中键入内容时,它就会转到连接的每个客户端。当客户端想要向每个其他客户端发送消息(全局消息)时,这也有帮助。服务器的主线程主要是接受客户端;我创建了另一个线程以允许您在控制台中输入。

至于您的流,您应该在启动 ClientTask 时创建它们,包括服务器端和客户端:

public class Server {
    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private Set<User> users = new HashSet<>();

    private boolean running;
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start() {
        running = true;

        Runnable acceptor = () -> {
            try(ServerSocket ss = new ServerSocket(port)) {
                while(running) {
                     User client = new User(ss.accept());
                     users.add(client);
                     executor.execute(client);
                }
            } catch(IOException e) {
                //if a server is already running on this port;
                //if the port is not open;
                e.printStackTrace();
            }
        };

        Runnable userInputReader = () -> {
            try(Scanner scanner = new Scanner(System.in)) {
                while(running) {
                    String input = scanner.nextLine();

                    for(User user : users) {
                        user.send(input);
                    }
                }
            } catch(IOException e) {
                //problem sending data;
                e.printStackTrace();
            }

        };

        Thread acceptorThread = new Thread(acceptor);
        Thread userThread = new Thread(userInputReader);
        acceptorThread.start();
        userThread.start();
    }

    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        new Server(15180).start();
        System.out.println("Server started!");
    }
}

run() 方法中应该包装流。

class User implements Runnable {
    private Socket socket;
    private boolean connected;

    private DataOutputStream out; //so we can access from the #send(String) method

    public User(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        connected = true;

        try(DataInputStream in = new DataInputStream(socket.getInputStream())) {
            out = new DataOutputStream(socket.getOutputStream());

             while(connected) {
                 String data = in.readUTF();
                 System.out.println("From client: "+data);
                 //send to all clients
             }
         } catch(IOException e) {
             //if there's a problem initializing streams;
             //if socket closes while attempting to read from it;
             e.printStackTrace();
         }
    }

    public void send(String message) throws IOException {
        if(connected) {
            out.writeUTF(message);
            out.flush();
        }
    }
}

和客户的想法差不多: 1. 连接服务器 2.创建"communication"个线程 3. 创建 "user input" 线程(从控制台接收输入) 4. 启动线程

public class Client {
    private final String host;
    private final int port;

    private boolean connected;
    private Socket socket;

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws IOException {
        connected = true;
        socket = new Socket(host, port);

        Runnable serverInputReader = () -> {
            try (DataInputStream in = new DataInputStream(socket.getInputStream())) {
                while (connected) {
                    String data = in.readUTF();
                    System.out.println(data);
                }
            } catch (IOException e) {
                // problem connecting to server; problem wrapping stream; problem receiving data from server;
                e.printStackTrace();
            }
        };

        Runnable userInputReader = () -> {
            try (DataOutputStream out = new DataOutputStream(socket.getOutputStream());
                    Scanner scanner = new Scanner(System.in)) {
                while (connected) {
                    String input = scanner.nextLine();
                    out.writeUTF(input);
                }
            } catch (IOException e) {
                //problem wrapping stream; problem sending data;
                e.printStackTrace();
            }
        };

        Thread communicateThread = new Thread(serverInputReader);
        Thread userThread = new Thread(userInputReader);
        communicateThread.start();
        userThread.start();
    }

    public static void main(String[] args) throws IOException {
        new Client("localhost", 15180).start();
    }
}

我在上面的代码中使用了一些您可能不熟悉的东西。它们有助于简化代码的语法:


编辑 当用户连接时,您应该按名称或 ID 存储他们的连接。这样,您就可以将数据发送给特定用户。即使你的客户端运行与服务器在同一台机器上,它仍然是相同的想法:客户端连接到服务器,服务器根据名称或id向客户端发送消息:

            while(running) {
                 User client = new User(ss.accept());
                 users.add(client); //add to set
                 executor.execute(client);
            }

现在,您只是将用户添加到 Set。目前无法从该集合中获取特定值。您需要做的是给它某种 "key"。为了给你一个想法,这里有一个我曾经使用过的旧算法。我有一个充满空槽的数组。当有人连接时,我会寻找第一个空槽。找到空插槽后,我将存储它的数组的索引传递给用户(这将是用户的 ID),然后将用户存储在指定索引处的数组中。当您需要向某人发送消息时,您可以使用 id 访问该特定数组索引,抓取您想要的用户并发送消息:

class Server {
    private int maxConnections = 10;
    private ExecutorService executor = Executors.newFixedThreadPool(maxConnections);
    private User[] users = new User[maxConnections];

    //...

    while(running) {
        Socket socket = ss.accept();

        for(int i = 0; i < users.length; i++) {
            if(users[i] == null) {
                users[i] = new User(socket, i);
                executor.execute(users[i]);
                break;
            }
        }
    }

    //...

    public static void sendGlobalMessage(String message) throws IOException {
        for(User user : users)
            if(user != null)
                user.send(message);
    }

    public static void sendPrivateMessage(String message, int id) {
        User user = users[id];

        if(user != null) {
            user.send(message);
        }
    }
}

class User {
    private Socket socket;
    private int id;

    private DataOutputStream out;

    public User(Socket socket, int id) {
        this.socket = socket;
        this.id = id;
    }

    public void send(String message) throws IOException {
        out.writeUTF(message);
        out.flush();
    }

    public void run() {
        DataInputStream in;
        //wrap in and out streams

        while(connected) {
            String data = in.readUTF();

            //Server.sendGlobalMessage(data);
            //Server.sendPrivateMessage(data, ...);
            sendMessage(data); //sends message back to client
        }
    }
}