Stx/Etx spring 集成中的序列化程序

Stx/Etx serializer in spring integration

我正在开发一个使用套接字与服务器通信的应用程序。

我在客户端使用 spring 集成,在服务器端使用 java ServerSocket。 我使用 STX/ETX 序列化套接字上的消息以指示消息的开始和结束为此目的我使用 spring ByteArrayStxEtxSerializer class.

但问题是当 ByteArrayStxEtxSerializer 用作序列化时,在服务器端收到的消息有一些延迟,反之亦然,有时甚至没有收到消息。

唯一让我感到困惑的是,当我用ByteArrayCrLfSerializer检查相同的场景时,整个过程很顺利,没有任何延迟或失败。

我的客户端和服务器端代码片段如下:

服务器端:

public void startSocketServer(){

  try (final ServerSocket serverSocket = new ServerSocket(9992)) {
       gl.info("Server is listening on: " + serverSocket.getLocalSocketAddress());

     while (true) {
         final Socket socket = serverSocket.accept();
         gl.info("A new client connected");
         new SocketThread(socket).start();
     }
  } catch (IOException e) {
    e.printStackTrace();
  }
}

private class SocketThread extends Thread {
  private final Socket socket;
  private final PrintWriter writer;
  private final BufferedReader reader;

  public SocketThread(Socket socket) throws IOException {
     this.socket = socket;
     InputStream input = socket.getInputStream();
     OutputStream output = socket.getOutputStream();

     reader = new BufferedReader(new InputStreamReader(input));
     writer = new PrintWriter(output, true);
  }
  public void run() {
     try {
         while (true) {
             String inputMessage = reader.readLine();
             if (inputMessage != null) {
                 MessageType messageType = getTypeInstance(inputMessage);

                if (messageType instanceof LoginMessage loginMessage) {
                     if (isAuthenticated(loginMessage.getUsername(), loginMessage.getPassword())) {
                        gl.info("#### SERVER => User authorized");
                        final String messageBody = createConnectionAckMessage();
                        print(writer, messageBody);
                     } else {
                         print(writer, createRefusalMessage());
                     }
                } else if (messageType instanceof StartTransferingData startData) {
                    getMessages().forEach(message-> print(writer, message));

                } else if (messageType instanceof DisconnectionAck disAck) {
                    print(writer, "By then")
                    break;
                }
            }
         }
         socket.close();
     } catch (IOException ex) {
         gl.info("Server exception: " + ex.getMessage());
     }
 }
 private void print(PrintWriter writer, String msg) {
     writer.print(ByteArrayStxEtxSerializer.STX);
     writer.print(msg);
     writer.print(ByteArrayStxEtxSerializer.ETX);
 } 
}

和客户端:

public class CapConfig {

   @MessagingGateway(defaultRequestChannel = "toTcp", errorChannel = "errorChannel")
   public interface TcpGateway {
     @Gateway
     void send(String in);
   }

   @Bean
   public MessageChannel toTcp() {
     return new DirectChannel();
   }

   @Bean
   public AbstractClientConnectionFactory clientCF() {
      return Tcp.netClient("localhost", 9992)
          .serializer(TcpCodecs.stxetx())
          .deserializer(TcpCodecs.stxetx())
          .get();
   }

   @Bean
   public IntegrationFlow tcpOutFlow(AbstractClientConnectionFactory connectionFactory) {
      return IntegrationFlows.from(toTcp())
         .handle(Tcp.outboundAdapter(connectionFactory))
         .get();
   }

   @Bean
   public IntegrationFlow tcpInFlow(AbstractClientConnectionFactory connectionFactory) {
      return IntegrationFlows.from(Tcp.inboundAdapter(connectionFactory))
         .transform(stringTransformer)
         .log()
         //---- Do some other stuffs
         .get();
   } 
}

问题是 reader.readLine(),因为 reader 搜索 \n 以识别行尾。

我用了STX/ETX来表示消息的开始和结束,那我就自己解析了。

        private String read() throws IOException {
           int bite = reader.read();
           if (bite!= ByteArrayStxEtxSerializer.STX){
              throw new RuntimeException();
           }
           char[] result = new result[socket.getReceiveBufferSize()]
           while((bite=reader.read())!= ByteArrayStxEtxSerializer.ETX){
                 //-- Collect the bytes and do related stuff
                 result.....
           }

           return new String(result);
        }