需要 serversocketchannel 每秒接受 1000 个 TCP 连接

Need serversocketchannel accept 1000 TCP connection per second

我正在使用以下代码连接我的自定义 java nio 服务器:

public static void main(String[] args) {
        try {

String value[] = { "00*********402", "00*********383",.....}
        int i = 0;

            while (i < value.length) {
                RunnableDemo temp = new RunnableDemo(value[i]);
                temp.start();
                i++;
                try {
                    Thread.sleep(1000); //REDUCING THIS TIME CAUSE PROBLEM
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (Exception e) {

            e.printStackTrace();
        }
    }

    class RunnableDemo implements Runnable {

    private Socket socket;

    private Thread t;

    private String threadName;// equals with client number

    RunnableDemo(int phoneNumber) {
        threadName = String.valueOf(phoneNumber);
        System.err.println("Creating " + threadName);

    }

    RunnableDemo(String phoneNumber) {
        threadName = phoneNumber;
        System.err.println("Creating " + threadName);

    }

    public void run() {
        System.err.println("Running " + threadName);
        try {

            //socket = new Socket("94.232.174.97", 4664);
            socket = new Socket("192.168.20.22", 4664);
            PrintWriter testWriter = new PrintWriter(new OutputStreamWriter(
                    socket.getOutputStream()));
            testWriter.print(threadName);
            testWriter.flush();

            String incoming_message = "";
            BufferedReader bufferedIn = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            while (true) {
                if (bufferedIn != null) {
                    incoming_message = bufferedIn.readLine();
                    System.out.println("recived message: " +  incoming_message );
                }
            }

        } catch (Exception e) {
            System.out.println("Thread " + threadName + " interrupted.");
            e.printStackTrace();
        }
        System.out.println("Thread " + threadName + " exiting.");
    }

    public void read() {

    }

    public void start() {
        System.out.println("Starting " + threadName);
        try {
            if (t == null) {
                t = new Thread(this, threadName);
                t.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

当我每 1000 mls 创建客户端线程时它工作正常但是当我将时间减少到 100mls(每秒将 10 个客户端连接到服务器)几秒钟后我的客户端线程收到以下错误:

java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at RunnableDemo.run(Main.java:419)
at java.lang.Thread.run(Unknown Source)

这也是服务器部分:

public class EchoServer {

static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);

private static final int BUFFER_SIZE = 1024;

private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;

private int port;
private String ipAddress = "192.168.20.22";
private Selector selector;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

int timestamp = 1;

HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();


public EchoServer() {
    this(DEFAULT_PORT);
}

public EchoServer(int port)  {
    try{
        this.port = port;
        hostAddress = InetAddress.getByName(ipAddress);
        selector = initSelector();
        loop();
    }catch(Exception ex){
        logger.error("Exception Accoured:",ex);
    }
}

private Selector initSelector()  {
    try{
        Selector socketSelector = SelectorProvider.provider().openSelector();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
        serverChannel.socket().bind(isa);
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
    }catch(Exception ex){
        logger.error("Exception Accoured:",ex);
        return null;
    }
}

private void loop() {
    while (true) {
        try {

            // Do defined operations for clients
            // ------------------------------
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
                    .iterator();

            int c = 0;
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    logger.warn(key.hashCode() + "- is invalid");
                    continue;
                }
                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
                c++;

            }

            logger.info(c + "  keys has been iterated");

            // Fetch List from server
            // -----------------------------------------
            try {
                ResultSet resultset = DataBase.getInstance()
                        .getQueryResult();


                while (resultset.next()) {
                    String mobileNumber = resultset.getString("MobileNo");

                    String message = resultset.getInt("IsMessage") + ","
                            + resultset.getInt("IsDeliver") + ","
                            + resultset.getInt("IsGroup") + ","
                            + resultset.getInt("IsSeen");
                    messageToClients.put(mobileNumber, message);

                }



            } catch (Exception ex) {
                //ex.printStackTrace();
                logger.error("Exception Accoured:",ex);
            }

            // Wait for 1 second
            // -----------------------------------------------
            Thread.sleep(1000);
            timestamp++;

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

    }
}

private void accept(SelectionKey key)  {

    try{
        // Initialize the connection ------------------------------------------
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
                .channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        logger.info("New client accepted");

        // Fire read for reading phone number --------------------------------
        socketChannel.register(selector, SelectionKey.OP_READ);
    }catch(Exception ex){
        logger.error("Exception Accoured:",ex);
    }
}

private void read(SelectionKey key)  {

    try{
        // Initialize Socket -----------------------------------------------------
        SocketChannel socketChannel = (SocketChannel) key.channel();


        // Reading Client Number -------------------------------------------------

        readBuffer.clear();

        int numRead;
        try {
            numRead = socketChannel.read(readBuffer);
        } catch (IOException e) {
            logger.error("Forceful shutdown--->" + key.hashCode());
            key.cancel();
            return;
        }

        // read was not successful
        if (numRead == -1) {
            logger.error("Graceful shutdown ---> " + key.hashCode());
            key.cancel();
            return;
        }

        // read was successful and now we can write it to String
        readBuffer.flip();
        byte[] bytes = new byte[readBuffer.limit()];
        readBuffer.get(bytes);

        String number = new String(bytes);

        number = number.replace("\r\n", "");
        number = number.trim();

        // Update Connect Clients Status -----------------------------------------
        Integer clientId=clientIds.get(number);
        if ( clientId == null) {
            connectedClients.put(key.hashCode(), number);
            clientIds.put(number, key.hashCode());
            logger.error(number + "- (" + key.hashCode() + ") has Connected");
        }else{
            connectedClients.remove(clientId);
            connectedClients.put(key.hashCode(), number);
            clientIds.put(number, key.hashCode());
            logger.error(number + "- (" + key.hashCode() + ") REconnected");
        }

        logger.error("All clients number are:" + connectedClients.size());

        // Fire Write Operations -------------------------------------------------
        socketChannel.register(selector, SelectionKey.OP_WRITE);

    }catch(Exception ex){
        //ex.printStackTrace();
        logger.error("Exception Accoured:",ex);
    }
}

private void write(SelectionKey key)  {
    try {

        //Check channel still alive ----------------------------------------------

        String clientNumber = connectedClients.get(key.hashCode());

        if(clientNumber == null){
            key.cancel();
            logger.info("key with hash=" + key.hashCode() + " canceled");
            return;
        }

        // Get Channel -----------------------------------------------------------
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Send Message if client number have new message ------------------------

        if (messageToClients.get(clientNumber) != null) {
            logger.info(clientNumber + "-" + key.hashCode()
                            + "- Sent write message");
            String timeStamp = String.valueOf(timestamp);
            String message = messageToClients.get(clientNumber);
            ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
            socketChannel.write(dummyResponse);
            messageToClients.remove(clientNumber);
        }

        // Fire new write state --------------------------------------------------
        socketChannel.register(selector, SelectionKey.OP_WRITE);

    } catch (IOException iox) {
        logger.error("Exception Accoured:key=" + key.hashCode(),iox);
        logger.info("$$$key with hash=" + key.hashCode() + " canceled");
        key.cancel();
    } 
}

也许某个端口每秒接受连接数有限制?!我需要至少每秒接受 1000 个 tcp 连接。有人可以帮忙吗?

更新

我使用这行代码将待定意向的数量更新为 1000:

serverChannel.socket().bind(isa,1000);

现在它收到了更多的客户端,但几秒钟后我仍然收到 connection refuse 错误。

您在 select 循环中浪费时间执行数据库操作,这会限制您的传入连接速率。不要这样做。 select 循环中唯一的阻塞操作应该是 select 本身。

而且您还在浪费更多时间在 select 调用之间进行一秒钟的睡眠。没有理由这样做。摆脱它。

NB 当read() returns -1 时你必须关闭通道,而不仅仅是取消密钥。否则你就是在泄露频道。