套接字通道。无效流 header:00000000

SocketChannel. invalid stream header: 00000000

我想序列化'Message' object,我可以通过socketChannel成功地将它作为字节数组传输。之后,我更改了 object 的属性(使其可能具有更大的大小),然后将 object 发送回客户端时出现问题。 一旦我尝试在客户端获取 object,就会出现异常,当我在 getResponse() 方法中取消实现 Message obj 时会发生这种情况:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

但是,不知何故,这仅适用于第一个客户端(抛出异常后,与第一个客户端的连接结束)并且当我启动一个新客户端(而不是关闭服务器)时,我可以成功传输 object 来回,此外,它适用于任何新客户。

这是我的最小可调试版本:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request, person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
    }

    private void sendResponse(SocketChannel client, Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

我尝试将此 object 作为字节数组发送:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command, Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",
                        command, person, result)
                .toString();
    }
}

我在 Message obj:

中包含了这个 class 的实例
public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

我不知道怎么回事,希望得到你的帮助。

UPD:我使用 'org.apache.commons:commons-lang3:3.5' 依赖项将 object 序列化为字节数组

我以前从未使用过Java NIO通道,所以我不是专家。但我发现了几件事:

一般:

  • 为了调试您的代码,使用 e.printStackTrace() 而不是 System.out.println(e.getMessage()).
  • 会很有帮助

客户:

    客户端的
  • SocketChannel server应该配置为阻塞,否则可能会读取0字节,因为还没有服务器响应,这会导致你的问题。
  • 你应该总是在ByteBuffer.clear()阅读之前打电话,而不是之后。
  • 读取后,在调用get(byte[])之前,必须通过responseBuffer.position(0)将字节缓冲区中的位置重置为0,否则在[=59=之后会读取未定义的字节 ] 刚看完的
  • 您应该根据读取的字节数而不是字节缓冲区大小来确定字节数组的大小。它可能会以相反的方式工作,但效率低下。

服务器:

  • 你应该总是在ByteBuffer.clear()阅读之前打电话,而不是之后。
  • 读取后,在调用get(byte[])之前,必须通过responseBuffer.position(0)将字节缓冲区中的位置重置为0,否则在[=59=之后会读取未定义的字节 ] 刚看完的
  • getRequest(key) 调用期间捕获异常时,您应该关闭相应的通道,否则在客户端断开连接后,服务器将无限期地尝试从中读取,并向您的控制台日志发送错误消息。我的修改处理了这种情况,还打印了一条很好的日志消息,告知哪个客户端(远程套接字地址)已关闭。

警告:您的代码中没有任何内容处理写入通道的请求或响应大于最大值的情况ByteBuffer尺寸在另一边。同样,理论上,(反)序列化 byte[] 最终也可能比字节缓冲区更大。

这是我的差异:

Index: src/main/java/de/scrum_master/Whosebug/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/Whosebug/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/Whosebug/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
   public void start() throws IOException {
     try {
       server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
-      server.configureBlocking(false);
+      server.configureBlocking(true);
     }
     catch (IOException e) {
       System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
       getResponse();
     }
     catch (Exception e) {
-      System.out.println(e.getMessage());
+      e.printStackTrace();
+//      System.out.println(e.getMessage());
       System.err.println("Connection lost");
       System.exit(0);
     }
   }
 
   public void getResponse() throws Exception {
-    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+    responseBuffer.clear();
 
     int read = server.read(responseBuffer);
-    responseBuffer.clear();
     if (read == -1) {
-      throw new Exception();
+      throw new Exception("EOF, cannot read server response");
     }
 
-    byte[] bytes = new byte[responseBuffer.limit()];
+    byte[] bytes = new byte[read];
+    responseBuffer.position(0);
     responseBuffer.get(bytes);
 
     Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/Whosebug/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/Whosebug/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/Whosebug/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
             getRequest(key);
           }
           catch (Exception e) {
-            System.err.println(e.getMessage());
+            e.printStackTrace();
+//            System.err.println(e.getMessage());
+            SocketChannel client = (SocketChannel) key.channel();
+            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+            client.close();
           }
         }
         iter.remove();
@@ -45,15 +49,16 @@
 
   private void getRequest(SelectionKey key) throws Exception {
     SocketChannel client = (SocketChannel) key.channel();
-    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+    requestBuffer.clear();
     int read = client.read(requestBuffer);
-    requestBuffer.clear();
     if (read == -1) {
       key.cancel();
       throw new Exception("Client disconnected at: " +
         ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
     }
-    byte[] bytes = new byte[requestBuffer.limit()];
+    byte[] bytes = new byte[read];
+    requestBuffer.position(0);
     requestBuffer.get(bytes);
     Message message = SerializationUtils.deserialize(bytes);
     sendResponse(client, message);

为了完整起见,这里是我更改后的完整 类:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

  private SocketChannel server;

  public void start() throws IOException {
    try {
      server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
      server.configureBlocking(true);
    }
    catch (IOException e) {
      System.err.println("Server isn't responding");
      System.exit(0);
    }

    Scanner scRequest = new Scanner(System.in);
    Scanner scState = new Scanner(System.in);

    System.out.println("Enter request:");
    String request = scRequest.nextLine();

    while (!request.equals("exit")) {
      try {
        // In my actual project class Person is a way different (But it's still a POJO)
        // I included it here to make sure I can get it back after sending to the server
        System.out.println("Enter a number:");
        Person person = new Person(scState.nextInt());
        sendRequest(request, person);

        System.out.println("\nEnter request:");
        request = scRequest.nextLine();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    }

    stop();
  }

  public void sendRequest(String sMessage, Person person) {
    Message message = new Message(sMessage, person);
    ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    try {
      server.write(requestBuffer);
      requestBuffer.clear();
      getResponse();
    }
    catch (Exception e) {
      e.printStackTrace();
//      System.out.println(e.getMessage());
      System.err.println("Connection lost");
      System.exit(0);
    }
  }

  public void getResponse() throws Exception {
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    responseBuffer.clear();

    int read = server.read(responseBuffer);
    if (read == -1) {
      throw new Exception("EOF, cannot read server response");
    }

    byte[] bytes = new byte[read];
    responseBuffer.position(0);
    responseBuffer.get(bytes);

    Message message = SerializationUtils.deserialize(bytes);
    System.out.println(message);
  }

  public void stop() throws IOException {
    server.close();
  }

  public static void main(String[] args) throws IOException {
    Client client = new Client();
    client.start();
  }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
  public void start() throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost", 5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server started");

    while (true) {
      selector.select();
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = iter.next();
        if (key.isAcceptable()) {
          register(selector, serverSocket);
        }
        if (key.isReadable()) {
          try {
            getRequest(key);
          }
          catch (Exception e) {
            e.printStackTrace();
//            System.err.println(e.getMessage());
            SocketChannel client = (SocketChannel) key.channel();
            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
            client.close();
          }
        }
        iter.remove();
      }
    }
  }

  private void getRequest(SelectionKey key) throws Exception {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    requestBuffer.clear();
    int read = client.read(requestBuffer);
    if (read == -1) {
      key.cancel();
      throw new Exception("Client disconnected at: " +
        ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
    }
    byte[] bytes = new byte[read];
    requestBuffer.position(0);
    requestBuffer.get(bytes);
    Message message = SerializationUtils.deserialize(bytes);
    sendResponse(client, message);
  }

  private void sendResponse(SocketChannel client, Message message) throws IOException {
    message.setResult("Some result");
    ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    while (responseBuffer.hasRemaining()) {
      client.write(responseBuffer);
    }
    responseBuffer.clear();
  }

  private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
    System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
  }

  public static void main(String[] args) throws Exception {
    new Server().start();
  }
}