'An established connection was aborted by the software in your host machine' 使用 java nio 套接字时
'An established connection was aborted by the software in your host machine' when using java nio socket
我使用 java nio
套接字开发了一个 java 服务器。这是我的申请代码:
public class EchoServer {
static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;
private int port;
private String ipAddress = "my ip";
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
int timestamp = 1;
HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();
public EchoServer() {
this(DEFAULT_PORT);
}
public EchoServer(int port) {
try{
this.port = port;
hostAddress = InetAddress.getByName(ipAddress);
selector = initSelector();
loop();
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private Selector initSelector() {
try{
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
return null;
}
}
private void loop() {
while (true) {
try {
// Do defined operations for clients
// ------------------------------
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
.iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
logger.warn(key.hashCode() + "- is invalid");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
// Fetch List from server
// -----------------------------------------
try {
ResultSet resultset = DataBase.getInstance()
.getQueryResult();
boolean flag = false;
while (resultset.next()) {
String mobileNumber = resultset.getString("MobileNo");
String message = resultset.getInt("IsMessage") + ","
+ resultset.getInt("IsDeliver") + ","
+ resultset.getInt("IsGroup") + ","
+ resultset.getInt("IsSeen");
messageToClients.put(mobileNumber, message);
}
} catch (Exception ex) {
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
// Wait for 1 second
// -----------------------------------------------
Thread.sleep(1000);
timestamp++;
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) {
try{
// Initialize the connection ------------------------------------------
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
logger.info("New client accepted");
// Fire read for reading phone number --------------------------------
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private void read(SelectionKey key) {
try{
// Initialize Socket -----------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Reading Client Number -------------------------------------------------
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
logger.error("Forceful shutdown");
key.cancel();
return;
}
// read was not successful
if (numRead == -1) {
logger.error("Graceful shutdown");
key.cancel();
return;
}
// read was successful and now we can write it to String
readBuffer.flip();
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);
String number = new String(bytes);
number = number.replace("\r\n", "");
number = number.trim();
// Update Connect Clients Status -----------------------------------------
Integer clientId=clientIds.get(number);
if ( clientId == null) {
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") has Connected");
}else{
connectedClients.remove(clientId);
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") REconnected");
}
//System.err.println("All clients number are:" + connectedClients.size());
logger.error("All clients number are:" + connectedClients.size());
// Fire Write Operations -------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
}catch(Exception ex){
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
}
private void write(SelectionKey key) {
try {
//Check channel still alive ----------------------------------------------
String clientNumber = connectedClients.get(key.hashCode());
if(clientNumber == null){
key.cancel();
return;
}
// Get Channel -----------------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Send Message if client number have new message ------------------------
if (messageToClients.get(clientNumber) != null) {
logger.info(clientNumber + "-" + key.hashCode()
+ "- Sent write message");
String timeStamp = String.valueOf(timestamp);
String message = messageToClients.get(clientNumber);
ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
socketChannel.write(dummyResponse);
messageToClients.remove(clientNumber);
}
// Fire new write state --------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (IOException iox) {
logger.error("Exception Accoured:",iox);
String number = connectedClients.get(key.hashCode());
clientIds.remove(number);
connectedClients.remove(key.hashCode());
key.cancel();
} catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
}
当我用 2-3 个客户端进行测试时它工作正常但是当我开始用大约 100-300 个客户端测试它时我多次收到以下异常(实际上它发生在 write()
方法上并且第 socketChannel.write(dummyResponse);
行:
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(Unknown Source)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.write(Unknown Source)
at sun.nio.ch.SocketChannelImpl.write(Unknown Source)
at net.behboodi.testserver.EchoServer.write(EchoServer.java:274)
at net.behboodi.testserver.EchoServer.loop(EchoServer.java:106)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:56)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:47)
at net.behboodi.testserver.Main.main(Main.java:44)
然后我无法从服务器接收消息。
当您结束流 (read()
returns -1) 时,您并没有关闭频道,所以您正在泄漏频道。您需要关闭通道,注意这样做会自动取消密钥。仅取消密钥本身是不够的。当你因为 clientNumber == null
或你得到 IOException
.
而取消密钥时也是如此
注意你不应该注册 OP_WRITE
除非你刚刚从 write()
电话中收到零,并且当你没有收到零时你应该注销它。
我使用 java nio
套接字开发了一个 java 服务器。这是我的申请代码:
public class EchoServer {
static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;
private int port;
private String ipAddress = "my ip";
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
int timestamp = 1;
HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();
public EchoServer() {
this(DEFAULT_PORT);
}
public EchoServer(int port) {
try{
this.port = port;
hostAddress = InetAddress.getByName(ipAddress);
selector = initSelector();
loop();
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private Selector initSelector() {
try{
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
return null;
}
}
private void loop() {
while (true) {
try {
// Do defined operations for clients
// ------------------------------
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
.iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
logger.warn(key.hashCode() + "- is invalid");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
// Fetch List from server
// -----------------------------------------
try {
ResultSet resultset = DataBase.getInstance()
.getQueryResult();
boolean flag = false;
while (resultset.next()) {
String mobileNumber = resultset.getString("MobileNo");
String message = resultset.getInt("IsMessage") + ","
+ resultset.getInt("IsDeliver") + ","
+ resultset.getInt("IsGroup") + ","
+ resultset.getInt("IsSeen");
messageToClients.put(mobileNumber, message);
}
} catch (Exception ex) {
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
// Wait for 1 second
// -----------------------------------------------
Thread.sleep(1000);
timestamp++;
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) {
try{
// Initialize the connection ------------------------------------------
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
logger.info("New client accepted");
// Fire read for reading phone number --------------------------------
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private void read(SelectionKey key) {
try{
// Initialize Socket -----------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Reading Client Number -------------------------------------------------
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
logger.error("Forceful shutdown");
key.cancel();
return;
}
// read was not successful
if (numRead == -1) {
logger.error("Graceful shutdown");
key.cancel();
return;
}
// read was successful and now we can write it to String
readBuffer.flip();
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);
String number = new String(bytes);
number = number.replace("\r\n", "");
number = number.trim();
// Update Connect Clients Status -----------------------------------------
Integer clientId=clientIds.get(number);
if ( clientId == null) {
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") has Connected");
}else{
connectedClients.remove(clientId);
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") REconnected");
}
//System.err.println("All clients number are:" + connectedClients.size());
logger.error("All clients number are:" + connectedClients.size());
// Fire Write Operations -------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
}catch(Exception ex){
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
}
private void write(SelectionKey key) {
try {
//Check channel still alive ----------------------------------------------
String clientNumber = connectedClients.get(key.hashCode());
if(clientNumber == null){
key.cancel();
return;
}
// Get Channel -----------------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Send Message if client number have new message ------------------------
if (messageToClients.get(clientNumber) != null) {
logger.info(clientNumber + "-" + key.hashCode()
+ "- Sent write message");
String timeStamp = String.valueOf(timestamp);
String message = messageToClients.get(clientNumber);
ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
socketChannel.write(dummyResponse);
messageToClients.remove(clientNumber);
}
// Fire new write state --------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (IOException iox) {
logger.error("Exception Accoured:",iox);
String number = connectedClients.get(key.hashCode());
clientIds.remove(number);
connectedClients.remove(key.hashCode());
key.cancel();
} catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
}
当我用 2-3 个客户端进行测试时它工作正常但是当我开始用大约 100-300 个客户端测试它时我多次收到以下异常(实际上它发生在 write()
方法上并且第 socketChannel.write(dummyResponse);
行:
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(Unknown Source)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.write(Unknown Source)
at sun.nio.ch.SocketChannelImpl.write(Unknown Source)
at net.behboodi.testserver.EchoServer.write(EchoServer.java:274)
at net.behboodi.testserver.EchoServer.loop(EchoServer.java:106)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:56)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:47)
at net.behboodi.testserver.Main.main(Main.java:44)
然后我无法从服务器接收消息。
当您结束流 (read()
returns -1) 时,您并没有关闭频道,所以您正在泄漏频道。您需要关闭通道,注意这样做会自动取消密钥。仅取消密钥本身是不够的。当你因为 clientNumber == null
或你得到 IOException
.
注意你不应该注册 OP_WRITE
除非你刚刚从 write()
电话中收到零,并且当你没有收到零时你应该注销它。