ThreadPoolExecutor.shutdownNow() 没有在 Thread 中抛出 InterruptedException

ThreadPoolExecutor.shutdownNow() not throwing InterruptedException in Thread

我正在实施一个传输服务器程序,该程序从客户端(通过控制台输入)获取消息,然后将其转发到某种邮箱。

为了允许不同客户端同时接收多条消息,我首先创建了一个实现 Runnable 接口的 class。每个 class 个实例都将处理与一个客户端的通信:

public class ClientConnection implements Runnable {

    //...

    //...

    @Override
    public void run() {
        try {
            // prepare the input reader and output writer
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            Message message = new Message();
            String request = "";

            // read client requests
            while ((request = reader.readLine()) != null) {

                System.out.println("Client sent the following request: " + request);
                String response;
                if (request.trim().equals("quit")) {
                    writer.println("ok bye");
                    return;
                }

                response = message.parseRequest(request);
                if (message.isCompleted()) {
                    messagesQueue.put(message);
                    message = new Message();
                }
                writer.println(response);
            }

        } catch (SocketException e) {
            System.out.println("ClientConnection: SocketException while handling socket: " + e.getMessage());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (InterruptedException e) {
            System.out.println("Client Connection was interrupted!");
            e.printStackTrace();
        } finally {
            if (clientSocket != null && !clientSocket.isClosed()) {
                try {
                    clientSocket.close();
                } catch (IOException ignored) {}
            }

        }

    }
}

我有一个父线程负责启动和管理所有 ClientConnection runnables:

@Override
public void run() {

    clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    while (true) {

        Socket clientSocket;

        try {
            // wait for a Client to connect
            clientSocket = serverSocket.accept();

            ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
            clientConnectionExecutor.execute(clientConnection);

        } catch (IOException e) {
            // when this exception occurs, it means that we want to shut down everything
            clientConnectionExecutor.shutdownNow();  // force terminate all ClientConnections
            return;
        }
    }
}

现在根据 this Whosebug Question,我预计一旦调用 shutdownNow();,就会在我的 ClientConnection.run() 方法中抛出一个 InterruptedException,并且在那里,它应该打印 Client Connection was interrupted!。但这并没有发生,所以 catch 子句似乎永远不会到达,输入读取循环一直在继续。

我在另一个 Whosebug 问题中读到,这可能与该块中的某些其他代码行有关,似乎正在消耗 InterruptedException,但没有任何关于什么代码行可以做到这一点的具体信息。所以我很感谢任何提示。

编辑:事实证明,只要我通过在客户端上键入“quit”手动退出循环,循环就会退出,然后会打印 Client Connection was interrupted!。所以不知何故,只要循环是运行,异常似乎就会被忽略,并且只在之后处理。

来自 shutdownNow 的 Oracle 文档:

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

如果您查看 ThreadPoolExecutor 来源,您会发现 shutdownNow 使用以下代码中断线程:

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

您的 ClientConnection 没有检查标志 Thread.interrupted。由于 post 中的信息,我无法弄清楚哪个方法抛出 InterruptedException。可能是其他一些方法,例如 reader 或 writer 的 readLine 阻塞了线程,因为它们使用套接字的 InputStreamOutputStream 并且因为很明显套接字的流阻塞如果数据不是立即可用的线程。

例如,我写了这段代码来测试它:

class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocket serverSocket = new ServerSocket()) {
                serverSocket.bind(new InetSocketAddress(8080));
                Socket socket = serverSocket.accept();
                int dataByte = socket.getInputStream().read();
                System.out.println(dataByte);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        thread.interrupt();
    }
}

在 OpenJdk-16.0.2 上没有实际中断。

我看到两个可能的解决方案来解决您的问题:

  1. 如果您确定 Socket 不会阻塞您的线程,请检查 while 循环中的 Thread.interrupted

  2. 如果您不确定,请在非阻塞模式下使用 SocketChannel 而不是 Socket 手动检查 Thread.interrupted

对于第二种方式,我将示例转换为:

class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
                serverSocket.configureBlocking(false);
                serverSocket.bind(new InetSocketAddress(8080));

                SocketChannel socket = null;

                while (socket == null) {
                    socket = serverSocket.accept();

                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                }

                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                socket.read(byteBuffer);
                byte[] bytes = new byte[byteBuffer.limit()];
                byteBuffer.flip();
                byteBuffer.get(bytes);
                System.out.println(new String(bytes, StandardCharsets.UTF_8));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                System.out.println("Interrupted successfully");
            }
        });
        thread.start();
        thread.interrupt();
    }
}

有效。

祝你好运 Java :)

I would have expected that as soon as shutdownNow(); is being called, an InterruptedException would be thrown within my ClientConnection.run()

您的 messagesQueue 应该是 BlockingQueue。所以 messagesQueue.put(message) 会让你需要捕获一个 Interrupted 异常。所以只有当线程阻塞在put方法中(queue已满),你调用threadpool#shutdownNow,线程才会收到Interrupted异常。在其他情况下,线程不会收到此 Interrupted 异常。

您可以将 while ((request = reader.readLine()) != null) 更改为 while ((request = reader.readLine()) != null && !Thread.interrupted())

另一种解决方案是维护所有客户端套接字,并在需要关闭它们时关闭所有客户端套接字,这样客户端线程将直接收到一个IOException:

        List<Socket> clientSockets = new ArrayList<>();
        while (true) {
            try {
                Socket accept = serverSocket.accept();
                clientSockets.add(accept);
                executorService.submit(new ClientConnection(accept));
            }catch (Exception e) {
                for (Socket socket : clientSockets) {
                    try {
                        socket.close();
                    } catch (Exception exception) {
                        //
                    }
                }
                //executorService.shutdownNow();
            }
        }