需要 serversocketchannel 每秒接受 1000 个 TCP 连接
Need serversocketchannel accept 1000 TCP connection per second
我正在使用以下代码连接我的自定义 java nio 服务器:
public static void main(String[] args) {
try {
String value[] = { "00*********402", "00*********383",.....}
int i = 0;
while (i < value.length) {
RunnableDemo temp = new RunnableDemo(value[i]);
temp.start();
i++;
try {
Thread.sleep(1000); //REDUCING THIS TIME CAUSE PROBLEM
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
class RunnableDemo implements Runnable {
private Socket socket;
private Thread t;
private String threadName;// equals with client number
RunnableDemo(int phoneNumber) {
threadName = String.valueOf(phoneNumber);
System.err.println("Creating " + threadName);
}
RunnableDemo(String phoneNumber) {
threadName = phoneNumber;
System.err.println("Creating " + threadName);
}
public void run() {
System.err.println("Running " + threadName);
try {
//socket = new Socket("94.232.174.97", 4664);
socket = new Socket("192.168.20.22", 4664);
PrintWriter testWriter = new PrintWriter(new OutputStreamWriter(
socket.getOutputStream()));
testWriter.print(threadName);
testWriter.flush();
String incoming_message = "";
BufferedReader bufferedIn = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
while (true) {
if (bufferedIn != null) {
incoming_message = bufferedIn.readLine();
System.out.println("recived message: " + incoming_message );
}
}
} catch (Exception e) {
System.out.println("Thread " + threadName + " interrupted.");
e.printStackTrace();
}
System.out.println("Thread " + threadName + " exiting.");
}
public void read() {
}
public void start() {
System.out.println("Starting " + threadName);
try {
if (t == null) {
t = new Thread(this, threadName);
t.start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
当我每 1000 mls 创建客户端线程时它工作正常但是当我将时间减少到 100mls(每秒将 10 个客户端连接到服务器)几秒钟后我的客户端线程收到以下错误:
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at RunnableDemo.run(Main.java:419)
at java.lang.Thread.run(Unknown Source)
这也是服务器部分:
public class EchoServer {
static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;
private int port;
private String ipAddress = "192.168.20.22";
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
int timestamp = 1;
HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();
public EchoServer() {
this(DEFAULT_PORT);
}
public EchoServer(int port) {
try{
this.port = port;
hostAddress = InetAddress.getByName(ipAddress);
selector = initSelector();
loop();
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private Selector initSelector() {
try{
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
return null;
}
}
private void loop() {
while (true) {
try {
// Do defined operations for clients
// ------------------------------
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
.iterator();
int c = 0;
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
logger.warn(key.hashCode() + "- is invalid");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
c++;
}
logger.info(c + " keys has been iterated");
// Fetch List from server
// -----------------------------------------
try {
ResultSet resultset = DataBase.getInstance()
.getQueryResult();
while (resultset.next()) {
String mobileNumber = resultset.getString("MobileNo");
String message = resultset.getInt("IsMessage") + ","
+ resultset.getInt("IsDeliver") + ","
+ resultset.getInt("IsGroup") + ","
+ resultset.getInt("IsSeen");
messageToClients.put(mobileNumber, message);
}
} catch (Exception ex) {
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
// Wait for 1 second
// -----------------------------------------------
Thread.sleep(1000);
timestamp++;
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) {
try{
// Initialize the connection ------------------------------------------
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
logger.info("New client accepted");
// Fire read for reading phone number --------------------------------
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private void read(SelectionKey key) {
try{
// Initialize Socket -----------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Reading Client Number -------------------------------------------------
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
logger.error("Forceful shutdown--->" + key.hashCode());
key.cancel();
return;
}
// read was not successful
if (numRead == -1) {
logger.error("Graceful shutdown ---> " + key.hashCode());
key.cancel();
return;
}
// read was successful and now we can write it to String
readBuffer.flip();
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);
String number = new String(bytes);
number = number.replace("\r\n", "");
number = number.trim();
// Update Connect Clients Status -----------------------------------------
Integer clientId=clientIds.get(number);
if ( clientId == null) {
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") has Connected");
}else{
connectedClients.remove(clientId);
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") REconnected");
}
logger.error("All clients number are:" + connectedClients.size());
// Fire Write Operations -------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
}catch(Exception ex){
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
}
private void write(SelectionKey key) {
try {
//Check channel still alive ----------------------------------------------
String clientNumber = connectedClients.get(key.hashCode());
if(clientNumber == null){
key.cancel();
logger.info("key with hash=" + key.hashCode() + " canceled");
return;
}
// Get Channel -----------------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Send Message if client number have new message ------------------------
if (messageToClients.get(clientNumber) != null) {
logger.info(clientNumber + "-" + key.hashCode()
+ "- Sent write message");
String timeStamp = String.valueOf(timestamp);
String message = messageToClients.get(clientNumber);
ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
socketChannel.write(dummyResponse);
messageToClients.remove(clientNumber);
}
// Fire new write state --------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (IOException iox) {
logger.error("Exception Accoured:key=" + key.hashCode(),iox);
logger.info("$$$key with hash=" + key.hashCode() + " canceled");
key.cancel();
}
}
也许某个端口每秒接受连接数有限制?!我需要至少每秒接受 1000 个 tcp 连接。有人可以帮忙吗?
更新
我使用这行代码将待定意向的数量更新为 1000:
serverChannel.socket().bind(isa,1000);
现在它收到了更多的客户端,但几秒钟后我仍然收到 connection refuse
错误。
您在 select 循环中浪费时间执行数据库操作,这会限制您的传入连接速率。不要这样做。 select 循环中唯一的阻塞操作应该是 select 本身。
而且您还在浪费更多时间在 select 调用之间进行一秒钟的睡眠。没有理由这样做。摆脱它。
NB 当read() returns -1 时你必须关闭通道,而不仅仅是取消密钥。否则你就是在泄露频道。
我正在使用以下代码连接我的自定义 java nio 服务器:
public static void main(String[] args) {
try {
String value[] = { "00*********402", "00*********383",.....}
int i = 0;
while (i < value.length) {
RunnableDemo temp = new RunnableDemo(value[i]);
temp.start();
i++;
try {
Thread.sleep(1000); //REDUCING THIS TIME CAUSE PROBLEM
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
class RunnableDemo implements Runnable {
private Socket socket;
private Thread t;
private String threadName;// equals with client number
RunnableDemo(int phoneNumber) {
threadName = String.valueOf(phoneNumber);
System.err.println("Creating " + threadName);
}
RunnableDemo(String phoneNumber) {
threadName = phoneNumber;
System.err.println("Creating " + threadName);
}
public void run() {
System.err.println("Running " + threadName);
try {
//socket = new Socket("94.232.174.97", 4664);
socket = new Socket("192.168.20.22", 4664);
PrintWriter testWriter = new PrintWriter(new OutputStreamWriter(
socket.getOutputStream()));
testWriter.print(threadName);
testWriter.flush();
String incoming_message = "";
BufferedReader bufferedIn = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
while (true) {
if (bufferedIn != null) {
incoming_message = bufferedIn.readLine();
System.out.println("recived message: " + incoming_message );
}
}
} catch (Exception e) {
System.out.println("Thread " + threadName + " interrupted.");
e.printStackTrace();
}
System.out.println("Thread " + threadName + " exiting.");
}
public void read() {
}
public void start() {
System.out.println("Starting " + threadName);
try {
if (t == null) {
t = new Thread(this, threadName);
t.start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
当我每 1000 mls 创建客户端线程时它工作正常但是当我将时间减少到 100mls(每秒将 10 个客户端连接到服务器)几秒钟后我的客户端线程收到以下错误:
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at java.net.Socket.<init>(Unknown Source)
at RunnableDemo.run(Main.java:419)
at java.lang.Thread.run(Unknown Source)
这也是服务器部分:
public class EchoServer {
static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;
private int port;
private String ipAddress = "192.168.20.22";
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
int timestamp = 1;
HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();
public EchoServer() {
this(DEFAULT_PORT);
}
public EchoServer(int port) {
try{
this.port = port;
hostAddress = InetAddress.getByName(ipAddress);
selector = initSelector();
loop();
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private Selector initSelector() {
try{
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
return null;
}
}
private void loop() {
while (true) {
try {
// Do defined operations for clients
// ------------------------------
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
.iterator();
int c = 0;
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
logger.warn(key.hashCode() + "- is invalid");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
c++;
}
logger.info(c + " keys has been iterated");
// Fetch List from server
// -----------------------------------------
try {
ResultSet resultset = DataBase.getInstance()
.getQueryResult();
while (resultset.next()) {
String mobileNumber = resultset.getString("MobileNo");
String message = resultset.getInt("IsMessage") + ","
+ resultset.getInt("IsDeliver") + ","
+ resultset.getInt("IsGroup") + ","
+ resultset.getInt("IsSeen");
messageToClients.put(mobileNumber, message);
}
} catch (Exception ex) {
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
// Wait for 1 second
// -----------------------------------------------
Thread.sleep(1000);
timestamp++;
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) {
try{
// Initialize the connection ------------------------------------------
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
logger.info("New client accepted");
// Fire read for reading phone number --------------------------------
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private void read(SelectionKey key) {
try{
// Initialize Socket -----------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Reading Client Number -------------------------------------------------
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
logger.error("Forceful shutdown--->" + key.hashCode());
key.cancel();
return;
}
// read was not successful
if (numRead == -1) {
logger.error("Graceful shutdown ---> " + key.hashCode());
key.cancel();
return;
}
// read was successful and now we can write it to String
readBuffer.flip();
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);
String number = new String(bytes);
number = number.replace("\r\n", "");
number = number.trim();
// Update Connect Clients Status -----------------------------------------
Integer clientId=clientIds.get(number);
if ( clientId == null) {
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") has Connected");
}else{
connectedClients.remove(clientId);
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") REconnected");
}
logger.error("All clients number are:" + connectedClients.size());
// Fire Write Operations -------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
}catch(Exception ex){
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
}
private void write(SelectionKey key) {
try {
//Check channel still alive ----------------------------------------------
String clientNumber = connectedClients.get(key.hashCode());
if(clientNumber == null){
key.cancel();
logger.info("key with hash=" + key.hashCode() + " canceled");
return;
}
// Get Channel -----------------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Send Message if client number have new message ------------------------
if (messageToClients.get(clientNumber) != null) {
logger.info(clientNumber + "-" + key.hashCode()
+ "- Sent write message");
String timeStamp = String.valueOf(timestamp);
String message = messageToClients.get(clientNumber);
ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
socketChannel.write(dummyResponse);
messageToClients.remove(clientNumber);
}
// Fire new write state --------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (IOException iox) {
logger.error("Exception Accoured:key=" + key.hashCode(),iox);
logger.info("$$$key with hash=" + key.hashCode() + " canceled");
key.cancel();
}
}
也许某个端口每秒接受连接数有限制?!我需要至少每秒接受 1000 个 tcp 连接。有人可以帮忙吗?
更新
我使用这行代码将待定意向的数量更新为 1000:
serverChannel.socket().bind(isa,1000);
现在它收到了更多的客户端,但几秒钟后我仍然收到 connection refuse
错误。
您在 select 循环中浪费时间执行数据库操作,这会限制您的传入连接速率。不要这样做。 select 循环中唯一的阻塞操作应该是 select 本身。
而且您还在浪费更多时间在 select 调用之间进行一秒钟的睡眠。没有理由这样做。摆脱它。
NB 当read() returns -1 时你必须关闭通道,而不仅仅是取消密钥。否则你就是在泄露频道。