Stx/Etx spring 集成中的序列化程序
Stx/Etx serializer in spring integration
我正在开发一个使用套接字与服务器通信的应用程序。
我在客户端使用 spring 集成,在服务器端使用 java ServerSocket
。
我使用 STX/ETX 序列化套接字上的消息以指示消息的开始和结束为此目的我使用 spring ByteArrayStxEtxSerializer
class.
但问题是当 ByteArrayStxEtxSerializer
用作序列化时,在服务器端收到的消息有一些延迟,反之亦然,有时甚至没有收到消息。
唯一让我感到困惑的是,当我用ByteArrayCrLfSerializer
检查相同的场景时,整个过程很顺利,没有任何延迟或失败。
我的客户端和服务器端代码片段如下:
服务器端:
public void startSocketServer(){
try (final ServerSocket serverSocket = new ServerSocket(9992)) {
gl.info("Server is listening on: " + serverSocket.getLocalSocketAddress());
while (true) {
final Socket socket = serverSocket.accept();
gl.info("A new client connected");
new SocketThread(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private class SocketThread extends Thread {
private final Socket socket;
private final PrintWriter writer;
private final BufferedReader reader;
public SocketThread(Socket socket) throws IOException {
this.socket = socket;
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
reader = new BufferedReader(new InputStreamReader(input));
writer = new PrintWriter(output, true);
}
public void run() {
try {
while (true) {
String inputMessage = reader.readLine();
if (inputMessage != null) {
MessageType messageType = getTypeInstance(inputMessage);
if (messageType instanceof LoginMessage loginMessage) {
if (isAuthenticated(loginMessage.getUsername(), loginMessage.getPassword())) {
gl.info("#### SERVER => User authorized");
final String messageBody = createConnectionAckMessage();
print(writer, messageBody);
} else {
print(writer, createRefusalMessage());
}
} else if (messageType instanceof StartTransferingData startData) {
getMessages().forEach(message-> print(writer, message));
} else if (messageType instanceof DisconnectionAck disAck) {
print(writer, "By then")
break;
}
}
}
socket.close();
} catch (IOException ex) {
gl.info("Server exception: " + ex.getMessage());
}
}
private void print(PrintWriter writer, String msg) {
writer.print(ByteArrayStxEtxSerializer.STX);
writer.print(msg);
writer.print(ByteArrayStxEtxSerializer.ETX);
}
}
和客户端:
public class CapConfig {
@MessagingGateway(defaultRequestChannel = "toTcp", errorChannel = "errorChannel")
public interface TcpGateway {
@Gateway
void send(String in);
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractClientConnectionFactory clientCF() {
return Tcp.netClient("localhost", 9992)
.serializer(TcpCodecs.stxetx())
.deserializer(TcpCodecs.stxetx())
.get();
}
@Bean
public IntegrationFlow tcpOutFlow(AbstractClientConnectionFactory connectionFactory) {
return IntegrationFlows.from(toTcp())
.handle(Tcp.outboundAdapter(connectionFactory))
.get();
}
@Bean
public IntegrationFlow tcpInFlow(AbstractClientConnectionFactory connectionFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(connectionFactory))
.transform(stringTransformer)
.log()
//---- Do some other stuffs
.get();
}
}
问题是 reader.readLine()
,因为 reader 搜索 \n
以识别行尾。
我用了STX/ETX来表示消息的开始和结束,那我就自己解析了。
private String read() throws IOException {
int bite = reader.read();
if (bite!= ByteArrayStxEtxSerializer.STX){
throw new RuntimeException();
}
char[] result = new result[socket.getReceiveBufferSize()]
while((bite=reader.read())!= ByteArrayStxEtxSerializer.ETX){
//-- Collect the bytes and do related stuff
result.....
}
return new String(result);
}
我正在开发一个使用套接字与服务器通信的应用程序。
我在客户端使用 spring 集成,在服务器端使用 java ServerSocket
。
我使用 STX/ETX 序列化套接字上的消息以指示消息的开始和结束为此目的我使用 spring ByteArrayStxEtxSerializer
class.
但问题是当 ByteArrayStxEtxSerializer
用作序列化时,在服务器端收到的消息有一些延迟,反之亦然,有时甚至没有收到消息。
唯一让我感到困惑的是,当我用ByteArrayCrLfSerializer
检查相同的场景时,整个过程很顺利,没有任何延迟或失败。
我的客户端和服务器端代码片段如下:
服务器端:
public void startSocketServer(){
try (final ServerSocket serverSocket = new ServerSocket(9992)) {
gl.info("Server is listening on: " + serverSocket.getLocalSocketAddress());
while (true) {
final Socket socket = serverSocket.accept();
gl.info("A new client connected");
new SocketThread(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private class SocketThread extends Thread {
private final Socket socket;
private final PrintWriter writer;
private final BufferedReader reader;
public SocketThread(Socket socket) throws IOException {
this.socket = socket;
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
reader = new BufferedReader(new InputStreamReader(input));
writer = new PrintWriter(output, true);
}
public void run() {
try {
while (true) {
String inputMessage = reader.readLine();
if (inputMessage != null) {
MessageType messageType = getTypeInstance(inputMessage);
if (messageType instanceof LoginMessage loginMessage) {
if (isAuthenticated(loginMessage.getUsername(), loginMessage.getPassword())) {
gl.info("#### SERVER => User authorized");
final String messageBody = createConnectionAckMessage();
print(writer, messageBody);
} else {
print(writer, createRefusalMessage());
}
} else if (messageType instanceof StartTransferingData startData) {
getMessages().forEach(message-> print(writer, message));
} else if (messageType instanceof DisconnectionAck disAck) {
print(writer, "By then")
break;
}
}
}
socket.close();
} catch (IOException ex) {
gl.info("Server exception: " + ex.getMessage());
}
}
private void print(PrintWriter writer, String msg) {
writer.print(ByteArrayStxEtxSerializer.STX);
writer.print(msg);
writer.print(ByteArrayStxEtxSerializer.ETX);
}
}
和客户端:
public class CapConfig {
@MessagingGateway(defaultRequestChannel = "toTcp", errorChannel = "errorChannel")
public interface TcpGateway {
@Gateway
void send(String in);
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractClientConnectionFactory clientCF() {
return Tcp.netClient("localhost", 9992)
.serializer(TcpCodecs.stxetx())
.deserializer(TcpCodecs.stxetx())
.get();
}
@Bean
public IntegrationFlow tcpOutFlow(AbstractClientConnectionFactory connectionFactory) {
return IntegrationFlows.from(toTcp())
.handle(Tcp.outboundAdapter(connectionFactory))
.get();
}
@Bean
public IntegrationFlow tcpInFlow(AbstractClientConnectionFactory connectionFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(connectionFactory))
.transform(stringTransformer)
.log()
//---- Do some other stuffs
.get();
}
}
问题是 reader.readLine()
,因为 reader 搜索 \n
以识别行尾。
我用了STX/ETX来表示消息的开始和结束,那我就自己解析了。
private String read() throws IOException {
int bite = reader.read();
if (bite!= ByteArrayStxEtxSerializer.STX){
throw new RuntimeException();
}
char[] result = new result[socket.getReceiveBufferSize()]
while((bite=reader.read())!= ByteArrayStxEtxSerializer.ETX){
//-- Collect the bytes and do related stuff
result.....
}
return new String(result);
}