MulServer - 客户端通信:关闭后,连接的客户端仍然可以交互[为什么?]

MulServer - Client communication: After a shutdown, connected clients can still interact[WHY?]

更新:非常感谢 Antonioosss 和 Peter Lawrey!

我创建了一个多线程服务器 - 客户端通信。

我有 3 个类:服务器、客户端、RequestHandler。

服务器打开一个 ServerSocket,然后开始通过 accept() 监听客户端,如果客户端连接,他将客户端的任务(一些字符串)引用到 RequestHandler。

对我来说重要的命令是"SHUTDOWN"。 如果 RequestHandler 找到此命令,他将调用服务器内的方法来关闭。

该方法基于Executor Service的使用示例: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html(如果不想点击link,方法见FAT文)

您不必阅读下面提供的代码,但如果有人对此感兴趣,我会提供它

使用方法示例:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

public class MulServer_v1 {

protected static int portNumber = 8540;
protected static int max_Clients = 3;
protected static boolean shutdownFlag = false;
private static ServerSocket serverSocket;
protected ExecutorService executor;
protected static ArrayList<Socket> socketList = new ArrayList<>();

public MulServer_v1(int portNumber, int poolSize) {
}

public void runServer() {

    try {
        serverSocket = new ServerSocket(portNumber);
        executor = Executors.newFixedThreadPool(max_Clients);
    } catch (IOException e) {
        System.out.println("Could not create server on specific port");
        e.printStackTrace();
    }

    while (!shutdownFlag) {
        try {
            Socket clientSocket = serverSocket.accept();
            socketList.add(clientSocket);
            executor.submit(new RequestHandler_v1(clientSocket));

        } catch (IOException e) {
            System.out.println("Couldn't accept on the Socket");
            executor.shutdown();
            e.printStackTrace();

        }

    }
    shutdownAndAwaitTermination();
}

public void shutdownAndAwaitTermination() {
    System.out.println("Shutting down..");
    executor.shutdown(); // Disable new tasks from being submitted
    try {
        // Wait a while for existing tasks to terminate
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
            executor.shutdownNow();
            // Cancel currently executing tasks
            System.out.println("komme ich hierhin?");

            // Wait a while for tasks to respond to being cancelled
            if (!executor.awaitTermination(10, TimeUnit.SECONDS))
                System.err.println("Pool did not terminate");
        }
    } catch (InterruptedException ie) {
        // (Re-)Cancel if current thread also interrupted
        executor.shutdownNow();
        // Preserve interrupt status
        Thread.currentThread().interrupt();
    }
    try {
        serverSocket.close();
    } catch (IOException e) {
        System.out.println("Serversocket konnte nicht geschlossen werden");
        e.printStackTrace();
    }
    System.out.println("I got here!");
    for (Socket s : socketList) {
        if (s != null) {
            try {
                s.close();
            } catch (IOException e) {
                System.out.println("Couldn't close the socket");
                e.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) {
    MulServer_v1 server = new MulServer_v1(portNumber, max_Clients);
    server.runServer();
}

}

 public class Client_v1 {

    public static final String HOSTNAME = "localhost";
    public static final int PORTNUMBER = 8540;
    private static boolean clientClose = false;

    public static void main(String[] args) throws IOException {
        System.out.println("Client started");

        try (Socket socket = new Socket(HOSTNAME, PORTNUMBER);

                PrintWriter out = new PrintWriter(socket.getOutputStream(),
                        true);
                // InputStream test = echoSocket.getInputStream();
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                BufferedReader stdIn = new BufferedReader(
                        new InputStreamReader(System.in))) {
            String userInput;

            while ((userInput = stdIn.readLine()) != null && !clientClose) {
                out.println(userInput);
                System.out.println("echo: " + in.readLine());
                // if (userInput.equals("BYE")) {
                // break;
                // }

            }
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host " + HOSTNAME);
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for the connection to "
                    + HOSTNAME);
            System.exit(1);
        }

    }

    protected static void closeClient() {
        clientClose = true;
    }
}

public class RequestHandler_v1 implements Runnable {
    // private final String password = "passwort";
    private final Socket client;
    private boolean closeFlag = false;

    public RequestHandler_v1(Socket client) {
        this.client = client;
}

@Override
public void run() {
    try (BufferedReader in = new BufferedReader(new InputStreamReader(
            client.getInputStream()));
            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(client.getOutputStream()));) {
        System.out.println("Thread started with name:"
                + Thread.currentThread().getName());
        String userInput;
        String serverResponse;

        while ((userInput = in.readLine()) != null) {
            serverResponse = processInput(userInput);
            System.out.println("Received message from "
                    + Thread.currentThread().getName() + " : " + userInput);
            writer.write("Sever Response : " + serverResponse);
            writer.newLine();
            writer.flush();
            if (closeFlag) {
                Client_v1.closeClient();
                MulServer_v1.socketList.remove(client);
                client.close();

            }
        }
    } catch (IOException e) {
        System.out.println("I/O exception: " + e);
    } catch (Exception ex) {


System.out.println("Exception in Thread Run. Exception : " + ex);
}
}
public String processInput(String input) {
    boolean commandFound = false;
    String output = "";
    try {
        if (input.getBytes("UTF-8").length > 255)
            output = "Max string length exceeded";
    } catch (UnsupportedEncodingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    Pattern allPattern = Pattern
            .compile("(?<lower>^LOWERCASE\s.+)|(?<upper>^UPPERCASE\s.+)|(?<reverse>^REVERSE\s.+)|(?<bye>^BYE)|(?<shutdown>^SHUTDOWN passwort)");

    Matcher allMatcher = allPattern.matcher(input);
    if (allMatcher.find()) {
        String lower = allMatcher.group("lower");
        String upper = allMatcher.group("upper");
        String reverse = allMatcher.group("reverse");
        String bye = allMatcher.group("bye");
        String shutdown = allMatcher.group("shutdown");
        commandFound = true;
        if (lower != null) {
            output = lower.substring(10).toLowerCase();
        }
        if (upper != null) {
            output = upper.substring(10).toUpperCase();
        }
        if (reverse != null) {
            output = new StringBuilder(reverse.substring(8)).reverse()
                    .toString();
        }
        if (bye != null) {
            output = "BYE";
            closeFlag = true;
        }
        if (shutdown != null) {
            output = "SHUTDOWN";
            MulServer_v1.shutdownFlag = true;
            closeFlag = true;
        }
    } else {
        commandFound = false;
        output = "UNKNOWN COMMAND";
    }

    if (commandFound) {
        output = "OK ".concat(output);
    } else {
        output = "ERROR ".concat(output);

    }
    return output;

}
}

现在可以关机了,但是关机后新客户端可以连接。这怎么可能? 这是我用来检查的系统输出:

正在关闭..

线程开始于 name:pool-1-thread-3

收到来自 pool-1-thread-3 的消息:。 //<--这(发送消息)应该 //NOT 能够发生,因为 executor.shutdown();已经调用了。

问题是你的信号坏了:

   while (!shutdownFlag) {
            try {
                Socket clientSocket = serverSocket.accept();
                executor.execute(new RequestHandler_v1(clientSocket));

            } catch (IOException e) {

accept() 正在阻塞操作 - 它会阻塞直到新连接到来,对吗?这就是罪魁祸首。在您发送 "shutdown" 命令后,当前线程将解除阻塞,提交测试,传递 while 条件 并且 再次阻塞 on accept()。在此之后,适当的执行程序会将标志设置为 false,但服务器仍然是 accepting,因此池永远不会关闭。 另一次连接尝试应该唤醒服务器并遵守 shutdownFlag 打破循环并导致所有处理程序在 10 秒后死亡。

还有:

while ((userInput = in.readLine()) != null) {

是一个阻塞操作 - 它会阻止您的任务完成,因此池将更新关闭。 null 将在流结束时返回 - 无论是自然结束还是异常结束。您不会在任何一方结束流。所以会阻塞。

ExecutorsService#shutdownNow() 并不意味着来自池的线程将被杀死——它们被发出终止信号,并且线程将像@PeterLawrey 提到的那样优雅地终止,使用 Thread.isTerminated() 标志。

关闭套接字将打破阻塞的 IO 操作的概念证明: public class 缓冲区 { 私有静态套接字客户端;

static class ServerThread extends Thread {
    @Override
    public void run() {
        try {
            ServerSocket serverS = new ServerSocket(1099);
            client = serverS.accept();
            client.getOutputStream().write('a');
            client.getOutputStream().flush();
            Thread.sleep(2000);
            client.close();
        } catch (IOException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

static class ClientThread extends Thread {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            Socket socket = new Socket("127.0.0.1", 1099);
            BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println("Will try to read");
            String line=null;
            while ((line = input.readLine()) != null) { // block here
                System.out.println("Read " + line); // will never come here
            }
        } catch (Exception e) {
            System.out.println("Server closed the connection!");
        }
        super.run();
    }
}

public static void main(String[] args) throws InterruptedException {
    new ServerThread().start();
    ClientThread t = new ClientThread();
    t.start();
    t.join();

}

如果您评论 client.close(); 应用将永远不会像您的情况一样结束。