处理许多客户端的最佳方式(使用线程?)

Best way to handle many clients (with threads?)

所以我的问题在这里。现在,如果我的服务器有超过 20 个客户端,它也有 20 个线程,而我的带有 ryzen CPU 的桌面在 30 个线程时使用率达到 100%。现在我想通过一台服务器处理大量客户端,但 CPU 只是被过度使用了。我的智慧很简单我是怎么做的,但是一定有更好的方法;因为到目前为止我还看到了很多不错的 java 服务器。我不知道我做错了什么。下面分享一下我的代码,原则上是怎么做的。

while(this.isRunning()) {
ServerSocket server = new ServerSocket(8081);
Socket s = server.accept();
new Thread(new WorkerRunnable(s)).start();
//now here if e.g. over 25 users connect there are 25 threads. CPU is at 100%. Is there a better way to handle this?

worker runnable 正在识别客户端。之后他们将进入聊天室。就像群聊一样

编辑:我未完成的代码的相关部分仍然非常 WIP

private boolean state;
private ServerSocket socket;

@Override
public void run() {
    while(this.isRunning()==true) {
        try {
            if(this.socket==null) this.socket = new ServerSocket(this.getPort());
            Socket connection = this.socket.accept();




            IntroductionSession session = new IntroductionSession(this, connection);
            new Thread(session).start();
            //register timeout task for 3 secs and handle it async



            System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
            //this.handleIncomingConnection(connection);
        } catch(Exception e) {
            e.printStackTrace();
            //System.exit(1);
        }
    }
}

private class IntroductionSession 实现 Runnable { private boolean alive = true;

    private BaseServer server;
    private Socket socket;
    private boolean introduced = false;

    public IntroductionSession(BaseServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    private void interrupt() {
        System.out.println("Not mroe alive");
        this.alive = false;
    }

    private void killConnection() {
        this.killConnection("no_reason");
    }

    private void killConnection(String reason) {
        try {
            if(this.from_client!=null) this.from_client.close();
            if(this.to_client!=null) this.to_client.close();
            this.socket.close();

            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Kicked connection, cause it didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Kicked unknown connection-type.");
                break;
                case "no_reason":
                default:
                    //ignore
                break;
            }
        } catch (IOException e) {
            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Error at kicking connection, which didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Error at kicking unknown connection-type.");
                break;
                case "no_reason":
                default:
                    System.out.println("Error occured at kicking connection");
                break;
            }

            e.printStackTrace();

        }
    }

    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    @Override
    public void run() {
        while(this.alive==true) {
            try {
                if(this.to_client==null) {
                    this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
                    //this.to_client.flush();
                }
                if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
                //Time runs now, if socket is inactive its getting kicked
                new Timer().schedule(new java.util.TimerTask() {
                        @Override
                        public void run() {
                            if(IntroductionSession.this.introduced==false) {
                                IntroductionSession.this.killConnection("didnt_introduce");
                                Thread.currentThread().interrupt();
                                IntroductionSession.this.interrupt();
                            }
                        }
                    }, 5000
                );

                Object obj = this.from_client.readObject();
                while(obj!=null) {
                    if(obj instanceof IntroductionPacket) {
                        IntroductionPacket pk = (IntroductionPacket) obj;
                        introduced = true;

                        if(isCompatible(pk)==false) {
                            try {
                                this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
                                this.to_client.close();
                                this.from_client.close();
                                IntroductionSession.this.socket.close();
                                System.out.println("Kicked socket, which uses another version.");
                            } catch(Exception e) {
                                Thread.currentThread().interrupt();
                                //ignore
                                System.out.println("Error at kicking incompatible socket.");
                                e.printStackTrace();
                            }
                        } else {
                            this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
                        }

                        Thread.currentThread().interrupt();
                    }
                }
            } catch(StreamCorruptedException e) {
                //unknown client-type = kick
                this.killConnection("unknown_type");
            } catch (IOException|ClassNotFoundException e) {
                e.printStackTrace();
                this.killConnection("no_reason");
            }/* catch(SocketException e) {

            }*/
        }
        Thread.currentThread().interrupt();
    }
}

扩展 class,这是一个实际的服务器:

@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
    new AuthenticationSession(connection, from_client, to_client).run();
}

private class AuthenticationSession implements Runnable {
    private Socket socket;
    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
        this.socket = socket;
        this.to_client = to_client;
        this.from_client = from_client;
    }
    //TODO: Implement app id for access tokens
    @Override
    public void run() {
        try {
            while(this.socket.isConnected()==true) {
                /*ObjectOutputStream to_client = new ObjectOutputStream(socket.getOutputStream()); //maybe cause problems, do it later if it does
                ObjectInputStream from_client = new ObjectInputStream(socket.getInputStream());*/

                Object object = from_client.readObject();

                while(object!=null) {
                    if(object instanceof RegisterPacket) {
                        RegisterPacket regPacket = (RegisterPacket) object;

                        System.out.println("Username:" + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
                    } else {
                        System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
                        this.socket.close();
                    }
                }
            }
        }/* catch(EOFException eofe) {
            //unexpected disconnect

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
        catch(Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        /*catch(Exception e) {
            //e.printStackTrace();

            Thread.currentThread().interrupt();
        }*/
    }

}

请不要看它非常糟糕的格式和我希望修复它所做的事情,尽管如此任务不会死。

既然你在做一个聊天应用程序,你需要考虑做一个单线程事件循环。

您可以保留字符串(客户端 ID)和套接字(客户端套接字)的映射。

Map<String, Socket> clientSockets;

您的服务器线程将接受新的客户端套接字并将其放入上面的映射中。然后会有另一个线程执行事件循环,只要 InputStream 中的任何客户端套接字中有数据,它就应该将该数据发送到所有其他客户端套接字(群聊)。这应该在睡眠间隔内无限发生。

通常,在生产级服务器代码中,我们不会直接创建套接字和处理请求。使用低级套接字、关闭连接和防止泄漏是一场噩梦。相反,我们依赖生产级框架,例如 Java Spring Framework or Play Framework

我的问题是,您为什么不使用任何服务器端框架,例如我上面列出的框架?

  1. 如果您想知道这些框架如何处理数千个并发请求,请查看 Thread Pool 等设计模式。这些框架抽象出复杂性并为您处理线程池。

  2. 如果客户不希望立即收到回复,您也可以考虑引入 messaging queue such as Kafka。服务器将从队列中一条一条地挑选消息并进行处理。但是,请记住,这是异步的,可能无法满足您的要求。

  3. 如果您不仅限于一台服务器,您可以考虑将您的服务器代码部署到 Azure 或 AWS VMSS(虚拟机规模集)。根据您配置的CPU负载规则,系统将自动缩放并为您动态管理资源。

我建议阅读与服务器相关的系统设计原则以加强您的理解。

Don't reinvent the wheel.