Java - 以全双工方式编写对套接字输出流阻塞的调用

Java - Write call to socket output stream blocking in full duplex

我正在编写一个客户端服务器应用程序,我想从两个不同的线程(一个线程用于读取,一个用于写入)在一个套接字上进行读写操作。我的系统几乎可以正常工作,但有一个令人费解的错误,我似乎无法解决。读写工作完全相互独立,但是当我开始在一个线程中从 SocketOutputStream 读取时,所有写入不同线程中的 InputStream 的调用都会无限期地阻塞.

我已经编写了一个小测试程序来快速重现问题并尽可能多地消除外部变量。我使用 java.nioServerSocketChannelSocketChannel 来建立连接,我使用 java.ioSocket ([=21= 的底层套接字]) 因为它易于与 ObjectInputStreamObjectOutputStream 一起使用。测试程序设计为运行两次;对于第一个 运行,用户输入 s 来启动服务器,在第二个 运行 中,用户输入 c 到 运行 客户端。

我的问题是: 为什么在 server() 方法中第二次调用 objectOutput.writeObject( message ); 时,下面程序的执行会阻塞? (该方法中倒数第四行)

我在程序代码下方包含了预期输出和实际输出以及我认为它们的含义。

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Main {

    private static final String IP_ADDRESS = "localhost";
    private static final int WELL_KNOWN_PORT = 4000;

    public static void main( String... args ) throws Exception {
        Scanner scanner = new Scanner( System.in );
        System.out.print( "choose (s)erver or (c)lient: " );
        char choice = scanner.nextLine().charAt( 0 );
        switch ( choice ) {
        case 's':
            server();
            break;
        case 'c':
            client();
            break;
        default:
            break;
        }
        scanner.close();
    }

    private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }
            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }
            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static class Message implements Serializable {

        private static final long serialVersionUID = 5649798518404142034L;
        private int data;

        public Message( int data ) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "" + data;
        }
    }
}

服务器

预期输出:

choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2
second object written to object output stream
object output stream flushed
read object on object input stream: 42

实际输出:

choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2

应用程序成功发送了第一个对象,但在第二个对象上无限期地阻塞。我能看到的唯一区别是,第二次写入调用是在单独线程上进行读取操作时发生的。我的第一直觉是 Socket 可能不支持不同线程的同时读写,但我对 Stack Overflow 的搜索表明它们确实支持这种同时操作(全双工)。这是我对上面代码的操作感到困惑的主要原因。

客户端

预期输出:

choose (s)erver or (c)lient: c
reading on object input stream
read first object on object input stream: 1
reading second object on object input stream
read second object on object input stream: 2
writing confirmation message to object output stream: 42
confirmation message written to object output stream
object output stream flushed

实际输出:

choose (s)erver or (c)lient: c
reading first object on object input stream
read first object on object input stream: 1
reading second object on object input stream

这确认客户端已成功发送和接收第一个对象。由于服务器中的这种奇怪的阻塞行为,客户端似乎正在等待服务器从未发送的第二个对象。

非常感谢任何人可以提供的任何建议。如果可以通过另一种方式轻松实现全双工,我愿意重写我的代码,但是如果有使用上述结构的解决方案,我更愿意坚持使用它,因为不必重构大部分代码。

这段代码有很多错误,我将不得不逐行查看:

private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );

        SocketChannel socketChannel = serverSocketChannel.accept();

上面没有什么'initializes a connection'。客户端初始化连接。此代码接受它。

        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );

这是默认设置。您不需要断言默认值。

        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

你不应该这样称呼。 finishConnect() 适用于在 非阻塞 模式下调用 connect() 客户端 。你是一个服务器,你没有调用 connect(),你也没有处于非阻塞模式。如果你 非阻塞模式的客户端,你不应该在睡眠循环中调用它:你应该使用 Selector.select()OP_CONNECT.

        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

当你使用阻塞模式和输出流时,根本看不出你为什么使用 ServerSocketChannelSocketChannel,事实上这至少是问题。一个鲜为人知的事实是,从 NIO 通道派生的流在通道上使用同步进行读取和写入,因此它们根本不是全双工的,即使底层 TCP 连接是全双工的。删除所有这些并使用 ServerSocketSocket.

重写
        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }

不要这样写代码。像下面这样的代码取决于像上面那样的先前 try 块的成功必须在那个 try 块内。否则,例如以下代码可能会得到 NullPointerExceptions.

            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }

同上。

            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

参见上文,了解为什么在从 NIO 通道派生的流中无法在单独的线程中执行此操作。

    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

上面最后两行没有意义,因为连接 已经 完成,因为你处于阻塞模式。

        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

您可以按原样使用其余部分,但 NIO 通道在这里毫无意义。你也可以使用 Socket.