MulServer - 客户端通信:关闭后,连接的客户端仍然可以交互[为什么?]
MulServer - Client communication: After a shutdown, connected clients can still interact[WHY?]
更新:非常感谢 Antonioosss 和 Peter Lawrey!
我创建了一个多线程服务器 - 客户端通信。
我有 3 个类:服务器、客户端、RequestHandler。
服务器打开一个 ServerSocket,然后开始通过 accept() 监听客户端,如果客户端连接,他将客户端的任务(一些字符串)引用到 RequestHandler。
对我来说重要的命令是"SHUTDOWN"。
如果 RequestHandler 找到此命令,他将调用服务器内的方法来关闭。
该方法基于Executor Service的使用示例:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html(如果不想点击link,方法见FAT文)
您不必阅读下面提供的代码,但如果有人对此感兴趣,我会提供它
使用方法示例:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
public class MulServer_v1 {
protected static int portNumber = 8540;
protected static int max_Clients = 3;
protected static boolean shutdownFlag = false;
private static ServerSocket serverSocket;
protected ExecutorService executor;
protected static ArrayList<Socket> socketList = new ArrayList<>();
public MulServer_v1(int portNumber, int poolSize) {
}
public void runServer() {
try {
serverSocket = new ServerSocket(portNumber);
executor = Executors.newFixedThreadPool(max_Clients);
} catch (IOException e) {
System.out.println("Could not create server on specific port");
e.printStackTrace();
}
while (!shutdownFlag) {
try {
Socket clientSocket = serverSocket.accept();
socketList.add(clientSocket);
executor.submit(new RequestHandler_v1(clientSocket));
} catch (IOException e) {
System.out.println("Couldn't accept on the Socket");
executor.shutdown();
e.printStackTrace();
}
}
shutdownAndAwaitTermination();
}
public void shutdownAndAwaitTermination() {
System.out.println("Shutting down..");
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
// Cancel currently executing tasks
System.out.println("komme ich hierhin?");
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(10, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
try {
serverSocket.close();
} catch (IOException e) {
System.out.println("Serversocket konnte nicht geschlossen werden");
e.printStackTrace();
}
System.out.println("I got here!");
for (Socket s : socketList) {
if (s != null) {
try {
s.close();
} catch (IOException e) {
System.out.println("Couldn't close the socket");
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
MulServer_v1 server = new MulServer_v1(portNumber, max_Clients);
server.runServer();
}
}
public class Client_v1 {
public static final String HOSTNAME = "localhost";
public static final int PORTNUMBER = 8540;
private static boolean clientClose = false;
public static void main(String[] args) throws IOException {
System.out.println("Client started");
try (Socket socket = new Socket(HOSTNAME, PORTNUMBER);
PrintWriter out = new PrintWriter(socket.getOutputStream(),
true);
// InputStream test = echoSocket.getInputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
BufferedReader stdIn = new BufferedReader(
new InputStreamReader(System.in))) {
String userInput;
while ((userInput = stdIn.readLine()) != null && !clientClose) {
out.println(userInput);
System.out.println("echo: " + in.readLine());
// if (userInput.equals("BYE")) {
// break;
// }
}
} catch (UnknownHostException e) {
System.err.println("Don't know about host " + HOSTNAME);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for the connection to "
+ HOSTNAME);
System.exit(1);
}
}
protected static void closeClient() {
clientClose = true;
}
}
public class RequestHandler_v1 implements Runnable {
// private final String password = "passwort";
private final Socket client;
private boolean closeFlag = false;
public RequestHandler_v1(Socket client) {
this.client = client;
}
@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(
client.getInputStream()));
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(client.getOutputStream()));) {
System.out.println("Thread started with name:"
+ Thread.currentThread().getName());
String userInput;
String serverResponse;
while ((userInput = in.readLine()) != null) {
serverResponse = processInput(userInput);
System.out.println("Received message from "
+ Thread.currentThread().getName() + " : " + userInput);
writer.write("Sever Response : " + serverResponse);
writer.newLine();
writer.flush();
if (closeFlag) {
Client_v1.closeClient();
MulServer_v1.socketList.remove(client);
client.close();
}
}
} catch (IOException e) {
System.out.println("I/O exception: " + e);
} catch (Exception ex) {
System.out.println("Exception in Thread Run. Exception : " + ex);
}
}
public String processInput(String input) {
boolean commandFound = false;
String output = "";
try {
if (input.getBytes("UTF-8").length > 255)
output = "Max string length exceeded";
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Pattern allPattern = Pattern
.compile("(?<lower>^LOWERCASE\s.+)|(?<upper>^UPPERCASE\s.+)|(?<reverse>^REVERSE\s.+)|(?<bye>^BYE)|(?<shutdown>^SHUTDOWN passwort)");
Matcher allMatcher = allPattern.matcher(input);
if (allMatcher.find()) {
String lower = allMatcher.group("lower");
String upper = allMatcher.group("upper");
String reverse = allMatcher.group("reverse");
String bye = allMatcher.group("bye");
String shutdown = allMatcher.group("shutdown");
commandFound = true;
if (lower != null) {
output = lower.substring(10).toLowerCase();
}
if (upper != null) {
output = upper.substring(10).toUpperCase();
}
if (reverse != null) {
output = new StringBuilder(reverse.substring(8)).reverse()
.toString();
}
if (bye != null) {
output = "BYE";
closeFlag = true;
}
if (shutdown != null) {
output = "SHUTDOWN";
MulServer_v1.shutdownFlag = true;
closeFlag = true;
}
} else {
commandFound = false;
output = "UNKNOWN COMMAND";
}
if (commandFound) {
output = "OK ".concat(output);
} else {
output = "ERROR ".concat(output);
}
return output;
}
}
现在可以关机了,但是关机后新客户端可以连接。这怎么可能?
这是我用来检查的系统输出:
正在关闭..
线程开始于 name:pool-1-thread-3
收到来自 pool-1-thread-3 的消息:。 //<--这(发送消息)应该 //NOT 能够发生,因为 executor.shutdown();已经调用了。
问题是你的信号坏了:
while (!shutdownFlag) {
try {
Socket clientSocket = serverSocket.accept();
executor.execute(new RequestHandler_v1(clientSocket));
} catch (IOException e) {
accept()
正在阻塞操作 - 它会阻塞直到新连接到来,对吗?这就是罪魁祸首。在您发送 "shutdown" 命令后,当前线程将解除阻塞,提交测试,传递 while 条件 并且 再次阻塞 on accept()
。在此之后,适当的执行程序会将标志设置为 false,但服务器仍然是 accepting
,因此池永远不会关闭。
另一次连接尝试应该唤醒服务器并遵守 shutdownFlag
打破循环并导致所有处理程序在 10 秒后死亡。
还有:
while ((userInput = in.readLine()) != null) {
是一个阻塞操作 - 它会阻止您的任务完成,因此池将更新关闭。 null
将在流结束时返回 - 无论是自然结束还是异常结束。您不会在任何一方结束流。所以会阻塞。
ExecutorsService#shutdownNow() 并不意味着来自池的线程将被杀死——它们被发出终止信号,并且线程将像@PeterLawrey 提到的那样优雅地终止,使用 Thread.isTerminated() 标志。
关闭套接字将打破阻塞的 IO 操作的概念证明:
public class 缓冲区 {
私有静态套接字客户端;
static class ServerThread extends Thread {
@Override
public void run() {
try {
ServerSocket serverS = new ServerSocket(1099);
client = serverS.accept();
client.getOutputStream().write('a');
client.getOutputStream().flush();
Thread.sleep(2000);
client.close();
} catch (IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
static class ClientThread extends Thread {
@Override
public void run() {
try {
Thread.sleep(500);
Socket socket = new Socket("127.0.0.1", 1099);
BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("Will try to read");
String line=null;
while ((line = input.readLine()) != null) { // block here
System.out.println("Read " + line); // will never come here
}
} catch (Exception e) {
System.out.println("Server closed the connection!");
}
super.run();
}
}
public static void main(String[] args) throws InterruptedException {
new ServerThread().start();
ClientThread t = new ClientThread();
t.start();
t.join();
}
如果您评论 client.close();
应用将永远不会像您的情况一样结束。
更新:非常感谢 Antonioosss 和 Peter Lawrey!
我创建了一个多线程服务器 - 客户端通信。
我有 3 个类:服务器、客户端、RequestHandler。
服务器打开一个 ServerSocket,然后开始通过 accept() 监听客户端,如果客户端连接,他将客户端的任务(一些字符串)引用到 RequestHandler。
对我来说重要的命令是"SHUTDOWN"。 如果 RequestHandler 找到此命令,他将调用服务器内的方法来关闭。
该方法基于Executor Service的使用示例: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html(如果不想点击link,方法见FAT文)
您不必阅读下面提供的代码,但如果有人对此感兴趣,我会提供它
使用方法示例:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
public class MulServer_v1 {
protected static int portNumber = 8540;
protected static int max_Clients = 3;
protected static boolean shutdownFlag = false;
private static ServerSocket serverSocket;
protected ExecutorService executor;
protected static ArrayList<Socket> socketList = new ArrayList<>();
public MulServer_v1(int portNumber, int poolSize) {
}
public void runServer() {
try {
serverSocket = new ServerSocket(portNumber);
executor = Executors.newFixedThreadPool(max_Clients);
} catch (IOException e) {
System.out.println("Could not create server on specific port");
e.printStackTrace();
}
while (!shutdownFlag) {
try {
Socket clientSocket = serverSocket.accept();
socketList.add(clientSocket);
executor.submit(new RequestHandler_v1(clientSocket));
} catch (IOException e) {
System.out.println("Couldn't accept on the Socket");
executor.shutdown();
e.printStackTrace();
}
}
shutdownAndAwaitTermination();
}
public void shutdownAndAwaitTermination() {
System.out.println("Shutting down..");
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
// Cancel currently executing tasks
System.out.println("komme ich hierhin?");
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(10, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
try {
serverSocket.close();
} catch (IOException e) {
System.out.println("Serversocket konnte nicht geschlossen werden");
e.printStackTrace();
}
System.out.println("I got here!");
for (Socket s : socketList) {
if (s != null) {
try {
s.close();
} catch (IOException e) {
System.out.println("Couldn't close the socket");
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
MulServer_v1 server = new MulServer_v1(portNumber, max_Clients);
server.runServer();
}
}
public class Client_v1 {
public static final String HOSTNAME = "localhost";
public static final int PORTNUMBER = 8540;
private static boolean clientClose = false;
public static void main(String[] args) throws IOException {
System.out.println("Client started");
try (Socket socket = new Socket(HOSTNAME, PORTNUMBER);
PrintWriter out = new PrintWriter(socket.getOutputStream(),
true);
// InputStream test = echoSocket.getInputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
BufferedReader stdIn = new BufferedReader(
new InputStreamReader(System.in))) {
String userInput;
while ((userInput = stdIn.readLine()) != null && !clientClose) {
out.println(userInput);
System.out.println("echo: " + in.readLine());
// if (userInput.equals("BYE")) {
// break;
// }
}
} catch (UnknownHostException e) {
System.err.println("Don't know about host " + HOSTNAME);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for the connection to "
+ HOSTNAME);
System.exit(1);
}
}
protected static void closeClient() {
clientClose = true;
}
}
public class RequestHandler_v1 implements Runnable {
// private final String password = "passwort";
private final Socket client;
private boolean closeFlag = false;
public RequestHandler_v1(Socket client) {
this.client = client;
}
@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(
client.getInputStream()));
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(client.getOutputStream()));) {
System.out.println("Thread started with name:"
+ Thread.currentThread().getName());
String userInput;
String serverResponse;
while ((userInput = in.readLine()) != null) {
serverResponse = processInput(userInput);
System.out.println("Received message from "
+ Thread.currentThread().getName() + " : " + userInput);
writer.write("Sever Response : " + serverResponse);
writer.newLine();
writer.flush();
if (closeFlag) {
Client_v1.closeClient();
MulServer_v1.socketList.remove(client);
client.close();
}
}
} catch (IOException e) {
System.out.println("I/O exception: " + e);
} catch (Exception ex) {
System.out.println("Exception in Thread Run. Exception : " + ex);
}
}
public String processInput(String input) {
boolean commandFound = false;
String output = "";
try {
if (input.getBytes("UTF-8").length > 255)
output = "Max string length exceeded";
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Pattern allPattern = Pattern
.compile("(?<lower>^LOWERCASE\s.+)|(?<upper>^UPPERCASE\s.+)|(?<reverse>^REVERSE\s.+)|(?<bye>^BYE)|(?<shutdown>^SHUTDOWN passwort)");
Matcher allMatcher = allPattern.matcher(input);
if (allMatcher.find()) {
String lower = allMatcher.group("lower");
String upper = allMatcher.group("upper");
String reverse = allMatcher.group("reverse");
String bye = allMatcher.group("bye");
String shutdown = allMatcher.group("shutdown");
commandFound = true;
if (lower != null) {
output = lower.substring(10).toLowerCase();
}
if (upper != null) {
output = upper.substring(10).toUpperCase();
}
if (reverse != null) {
output = new StringBuilder(reverse.substring(8)).reverse()
.toString();
}
if (bye != null) {
output = "BYE";
closeFlag = true;
}
if (shutdown != null) {
output = "SHUTDOWN";
MulServer_v1.shutdownFlag = true;
closeFlag = true;
}
} else {
commandFound = false;
output = "UNKNOWN COMMAND";
}
if (commandFound) {
output = "OK ".concat(output);
} else {
output = "ERROR ".concat(output);
}
return output;
}
}
现在可以关机了,但是关机后新客户端可以连接。这怎么可能? 这是我用来检查的系统输出:
正在关闭..
线程开始于 name:pool-1-thread-3
收到来自 pool-1-thread-3 的消息:。 //<--这(发送消息)应该 //NOT 能够发生,因为 executor.shutdown();已经调用了。
问题是你的信号坏了:
while (!shutdownFlag) {
try {
Socket clientSocket = serverSocket.accept();
executor.execute(new RequestHandler_v1(clientSocket));
} catch (IOException e) {
accept()
正在阻塞操作 - 它会阻塞直到新连接到来,对吗?这就是罪魁祸首。在您发送 "shutdown" 命令后,当前线程将解除阻塞,提交测试,传递 while 条件 并且 再次阻塞 on accept()
。在此之后,适当的执行程序会将标志设置为 false,但服务器仍然是 accepting
,因此池永远不会关闭。
另一次连接尝试应该唤醒服务器并遵守 shutdownFlag
打破循环并导致所有处理程序在 10 秒后死亡。
还有:
while ((userInput = in.readLine()) != null) {
是一个阻塞操作 - 它会阻止您的任务完成,因此池将更新关闭。 null
将在流结束时返回 - 无论是自然结束还是异常结束。您不会在任何一方结束流。所以会阻塞。
ExecutorsService#shutdownNow() 并不意味着来自池的线程将被杀死——它们被发出终止信号,并且线程将像@PeterLawrey 提到的那样优雅地终止,使用 Thread.isTerminated() 标志。
关闭套接字将打破阻塞的 IO 操作的概念证明: public class 缓冲区 { 私有静态套接字客户端;
static class ServerThread extends Thread {
@Override
public void run() {
try {
ServerSocket serverS = new ServerSocket(1099);
client = serverS.accept();
client.getOutputStream().write('a');
client.getOutputStream().flush();
Thread.sleep(2000);
client.close();
} catch (IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
static class ClientThread extends Thread {
@Override
public void run() {
try {
Thread.sleep(500);
Socket socket = new Socket("127.0.0.1", 1099);
BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("Will try to read");
String line=null;
while ((line = input.readLine()) != null) { // block here
System.out.println("Read " + line); // will never come here
}
} catch (Exception e) {
System.out.println("Server closed the connection!");
}
super.run();
}
}
public static void main(String[] args) throws InterruptedException {
new ServerThread().start();
ClientThread t = new ClientThread();
t.start();
t.join();
}
如果您评论 client.close();
应用将永远不会像您的情况一样结束。