限制 java 套接字服务器中的连接数

restricting number of connections in java socket server

我创建了一个 Java 套接字服务器,它在指定的端口上创建一个套接字服务器,然后生成一个 RecordWriter 对象来对从每个连接获得的数据流执行一些操作。

我以端口为 61000 和 numthreads 为 2 启动程序。 我还启动了 3 个客户端来连接它。 在客户端,我可以看到所有 3 个都连接到接收器,但是接收器日志显示只有两个连接。

netstat -an|grep 61000|grep -i ESTABLISHED

表示总共有 6 个连接,因为客户端和服务器 运行 在同一台机器上。 我的疑惑是:

  1. 为什么客户端第三次登录时显示可以连接到61000上的程序,而我使用的是2的backlog。另外Executors.newFixedThreadPool(numThreads);只允许连接2个客户端。
  2. 尽管 server.accept 发生在 MyWriter.java 中并且日志中没有指示第三个客户端可以连接,为什么 netstat 将其显示为已建立的连接

这是我的代码:

MyReceiver.java

package com.vikas;

import java.net.ServerSocket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MyReceiver{

    protected int serverPort = -1;
    protected int numThreads = -1;

    protected boolean isStopped = false;
    protected Thread runningThread = null;
    protected ExecutorService threadPool = null;

    protected static Logger logger = LogManager.getLogger(MyReceiver.class);
    protected static ServerSocket serverSocket = null;
    protected static Map<String, String> mapConnections = new ConcurrentHashMap<String, String>();

    public MyReceiver(int port){
        this.serverPort = port;
    }

    public void run(int numThreads){
        this.threadPool = Executors.newFixedThreadPool(numThreads);

        try {
            logger.info("Starting server on port " + this.serverPort);
            MyReceiver.serverSocket = new ServerSocket(this.serverPort, numThreads);
        } catch (IOException e) {
            //throw new RuntimeException("Cannot open port " + this.serverPort, e);
            logger.error("Cannot open port " + this.serverPort, e);
        }

        while(!isStopped()){
            this.threadPool.execute(new MyWriter());
        }


        if(MyReceiver.mapConnections.isEmpty()){
            this.threadPool.shutdown();
            //System.out.println("Server Stopped after shutdown.") ;
            logger.info("Server Stopped after shutdown.");
        }
    }


    public synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop(){
        this.isStopped = true;
        try {
            MyReceiver.serverSocket.close();
        } catch (IOException e) {
            //throw new RuntimeException("Error closing server", e);
            logger.error("Error closing server", e);
        }
    }

    public static void main(String[] args) {
        if(args.length != 2){
            System.out.println("Number of input arguements is not equal to 4.");
            System.out.println("Usage:  java -cp YOUR_CLASSPATH -Dlog4j.configurationFile=/path/to/log4j2.xml com.vikas.MyReceiver  <port>  <number of threads>");
            System.out.println("java -cp \"$CLASSPATH:./MyReceiver.jar:./log4j-api-2.6.2.jar:./log4j-core-2.6.2.jar\" -Dlog4j.configurationFile=log4j2.xml com.vikas.MyReceiver  61000 2");
        }
        int port = Integer.parseInt(args[0].trim());
        int numThreads = Integer.parseInt(args[1].trim());

        final MyReceiver myConnection = new MyReceiver(port, topic, brokers);
        myConnection.run(numThreads);

        /*Thread t = new Thread(myConnection);
        t.start();*/


        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            //e.printStackTrace();
            logger.error("Something went wrong", e);
        }
        //System.out.println("Stopping Server");
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
            @Override
            public void run()
            {
                logger.info("SocketServer - Receive SIGINT!!!");
                logger.info("Stopping Server");

                if(!myConnection.isStopped()){
                    myConnection.stop();
                }
                logger.info("Server Stopped successfully");

                try
                {
                    Thread.sleep(1000);
                }
                catch (Exception e) {}
            }
        });
        //myConnection.stop();
    }
}

MyWriter.java

package com.vikas;

import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.Socket;
import java.util.Properties;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class MyWriter implements Runnable{

    protected String topic = null;
    protected String brokers = null;

    protected static Logger logger = LogManager.getLogger(MyWriter.class);

    public MyWriter () {

    }


    public void run() {
        while(!MyReceiver.serverSocket.isClosed()){
            Socket server = null;
            try {
                server = MyReceiver.serverSocket.accept();
                //System.out.println("Just connected to " + server.getRemoteSocketAddress());
                logger.info("Just connected to " + server.getRemoteSocketAddress());
                MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");

                //change for prod deployment //change implemented
                String key = null;
                String message = null;

                char ch;
                StringBuilder msg = new StringBuilder();
                int value = 0;


                try {
                    BufferedReader in = new BufferedReader(new InputStreamReader(server.getInputStream())); 
                    while((value = in.read()) != -1){
                        ch = (char)value;
                        if(ch == 0x0a){
                            //msg.append(ch);
                            //System.out.println(msg);

                            message = msg.toString().trim();

                            //code change as part of testing in prod
                            if(message.length() != 0){
                                //do something
                                msg.setLength(0);
                            }
                            else{
                                logger.error("Blank String received");
                                msg.setLength(0);
                            }
                        }
                        else{
                            msg.append(ch);
                        }
                    }
                    logger.info("Closing connection for client :" + server.getRemoteSocketAddress());
                    //System.out.println("Closing connection for client :" + this.getClientSocket().getRemoteSocketAddress());
                    server.close();
                    MyReceiver.mapConnections.remove(server.getRemoteSocketAddress());
                } catch (IOException e) {
                    //report exception somewhere.
                    //e.printStackTrace();
                    logger.error("Something went wrong!!", e);
                }
                finally{
                    producer.close();
                }

            } catch (IOException e) {
                if(MyReceiver.serverSocket.isClosed()) {
                    //System.out.println("Server was found to be Stopped.");
                    logger.error("Server was found to be Stopped.");
                    logger.error("Error accepting client connection", e);
                    break;
                }
            }           
        }   
    }
}

ServerSocket 构造函数的 backlog 参数限制传入连接队列的大小,而不是允许您成功调用 accept() 的总次数。如果你想限制活动连接的数量,你需要跟踪你已经接受的连接数量,那么当你达到阈值时,在至少关闭一个活动连接之前不要再次调用 accept() .

while(!MyReceiver.serverSocket.isClosed()){
        Socket server = null;
        try {
            server = MyReceiver.serverSocket.accept();
            //System.out.println("Just connected to " + server.getRemoteSocketAddress());
            logger.info("Just connected to " + server.getRemoteSocketAddress());
            MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");

            if (activeConnections == maxConnections) break; // exit accept loop