如何在 spring 启动时创建工作 TCP 服务器套接字以及如何处理传入消息?
How to create a working TCP Server socket in spring boot and how to handle the incoming message?
我试图在现有的 spring 启动应用程序中实现一个带有 spring 集成的 TCP 服务器套接字,但我遇到了一个问题,这个问题让我发疯...
客户端正在向服务器发送消息(字节数组)并超时。而已。
我没有从服务器收到任何异常。看来我提供了错误的端口或其他东西,但在检查端口后,我确定它是正确的。
这是我基于注解的配置class:
import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;
@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {
@Value("${socket.port}")
private int serverSocketPort;
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
TcpServerConnectionFactorySpec connectionFactory =
Tcp.netServer(socketPort)
.deserializer(new CustomSerializerDeserializer())
.serializer(new CustomSerializerDeserializer())
.soTcpNoDelay(true);
TcpInboundGatewaySpec inboundGateway =
Tcp.inboundGateway(connectionFactory);
return IntegrationFlows
.from(inboundGateway)
.handle(serverSocketHandler::handleMessage)
.get();
}
@Bean
public ServerSocketHandler serverSocketHandler() {
return new ServerSocketHandler();
}
}
我想在尝试发送答复之前让接收功能正常工作,所以这就是为什么要进行最小配置的原因。
而下面的class应该处理从服务器套接字接收到的消息
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
@Log4j2
public class ServerSocketHandler {
public String handleMessage(Message<?> message, MessageHeaders messageHeaders) {
log.info(message.getPayload());
// TODO implement something useful to process the incoming message here...
return message.getPayload().toString();
}
}
上面的处理程序方法从未被调用过一次!
我已经在谷歌上搜索了一些示例实现或教程,但我还没有找到对我有用的东西。
我已经尝试了这些网站的实现:
- https://vispud.blogspot.com/2019/03/how-to-implement-simple-echo-socket.html
- https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#note-nio
还有很多网站...但对我没有任何帮助:-(
更新 1
我已经实现了自定义 serializer/deserializer:
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>,
Deserializer<byte[]> {
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
return inputStream.readAllBytes();
}
@Override
public void serialize(byte[] object, OutputStream outputStream) throws IOException {
outputStream.write(object);
}
}
客户端发送消息后,调用自定义序列化程序,但内容始终为空。我不知道为什么......序列化程序需要大量时间从流中读取所有字节,最后它是空的。这个过程一直在重复,所以我想我不小心建立了一个无限循环...
更新 2
我已经捕获了客户端和服务器套接字之间的通信:
看起来我卡在了握手过程中,因此没有有效负载...
因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。
提前致谢!
您如何与该服务器通信?默认情况下,连接工厂配置为要求输入由 CRLF 终止(例如 Telnet)。如果您的客户端使用其他东西来指示消息结束,则必须配置不同的反序列化器。
另外,您的方法签名不正确;应该是:
public String handleMessage(byte[] message, MessageHeaders messageHeaders) {
String string = new String(message);
System.out.println(string);
return string.toUpperCase();
}
我使用 Telnet 时效果很好:
$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
FOO
^]
telnet> quit
Connection closed.
这是一个仅适用于 LF 的版本(例如 netcat):
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
return IntegrationFlows.from(Tcp.inboundGateway(
Tcp.netServer(1234)
.deserializer(TcpCodecs.lf())
.serializer(TcpCodecs.lf())))
.handle(serverSocketHandler::handleMessage)
.get();
}
$ nc localhost 1234
foo
FOO
^C
好吧,经过几天的分析和编码,我找到了使用 spring 集成处理 TCP 套接字通信的最佳解决方案。对于正在为同样的问题苦苦挣扎的其他开发人员。这是我到目前为止所做的。
这个 class 包含一个 - 对我来说 - 基于注释的 TCP 套接字连接配置
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;
/**
* Spring annotation based configuration
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Value("${socket.port}")
private int socketPort;
/**
* Reply messages are routed to the connection only if the reply contains the ip_connectionId header
* that was inserted into the original message by the connection factory.
*/
@MessagingGateway(defaultRequestChannel = "toTcp")
public interface Gateway {
void send(String message, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
serverCf.setSerializer(SERIALIZER);
serverCf.setDeserializer(SERIALIZER);
serverCf.setSoTcpNoDelay(true);
serverCf.setSoKeepAlive(true);
// serverCf.setSingleUse(true);
// final int soTimeout = 5000;
// serverCf.setSoTimeout(soTimeout);
return serverCf;
}
@Bean
public AbstractClientConnectionFactory clientCF() {
TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost", socketPort);
clientCf.setSerializer(SERIALIZER);
clientCf.setDeserializer(SERIALIZER);
clientCf.setSoTcpNoDelay(true);
clientCf.setSoKeepAlive(true);
// clientCf.setSingleUse(true);
// final int soTimeout = 5000;
// clientCf.setSoTimeout(soTimeout);
return clientCf;
}
@Bean
public TcpInboundGateway tcpInGate() {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(serverCF());
inGate.setRequestChannel(fromTcp());
inGate.setReplyChannel(toTcp());
return inGate;
}
@Bean
public TcpOutboundGateway tcpOutGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(clientCF());
outGate.setReplyChannel(toTcp());
return outGate;
}
此 class 包含自定义序列化器和反序列化器
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>, Deserializer<byte[]> {
@NotNull
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] message = new byte[0];
if (inputStream.available() > 0) {
message = inputStream.readAllBytes();
}
log.debug("Deserialized message {}", new String(message, StandardCharsets.UTF_8));
return message;
}
@Override
public void serialize(@NotNull byte[] message, OutputStream outputStream) throws IOException {
log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
outputStream.write(message);
outputStream.flush();
}
}
在接下来的 classes 中,您可以实现一些业务逻辑来处理传入...
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// TODO implement some buisiness logic here
return msg;
}
}
和外发消息。
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// implement some business logic here
return msg;
}
}
希望对您有所帮助。 ;-)
我试图在现有的 spring 启动应用程序中实现一个带有 spring 集成的 TCP 服务器套接字,但我遇到了一个问题,这个问题让我发疯... 客户端正在向服务器发送消息(字节数组)并超时。而已。 我没有从服务器收到任何异常。看来我提供了错误的端口或其他东西,但在检查端口后,我确定它是正确的。
这是我基于注解的配置class:
import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;
@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {
@Value("${socket.port}")
private int serverSocketPort;
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
TcpServerConnectionFactorySpec connectionFactory =
Tcp.netServer(socketPort)
.deserializer(new CustomSerializerDeserializer())
.serializer(new CustomSerializerDeserializer())
.soTcpNoDelay(true);
TcpInboundGatewaySpec inboundGateway =
Tcp.inboundGateway(connectionFactory);
return IntegrationFlows
.from(inboundGateway)
.handle(serverSocketHandler::handleMessage)
.get();
}
@Bean
public ServerSocketHandler serverSocketHandler() {
return new ServerSocketHandler();
}
}
我想在尝试发送答复之前让接收功能正常工作,所以这就是为什么要进行最小配置的原因。
而下面的class应该处理从服务器套接字接收到的消息
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
@Log4j2
public class ServerSocketHandler {
public String handleMessage(Message<?> message, MessageHeaders messageHeaders) {
log.info(message.getPayload());
// TODO implement something useful to process the incoming message here...
return message.getPayload().toString();
}
}
上面的处理程序方法从未被调用过一次! 我已经在谷歌上搜索了一些示例实现或教程,但我还没有找到对我有用的东西。 我已经尝试了这些网站的实现:
- https://vispud.blogspot.com/2019/03/how-to-implement-simple-echo-socket.html
- https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#note-nio
还有很多网站...但对我没有任何帮助:-(
更新 1
我已经实现了自定义 serializer/deserializer:
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>,
Deserializer<byte[]> {
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
return inputStream.readAllBytes();
}
@Override
public void serialize(byte[] object, OutputStream outputStream) throws IOException {
outputStream.write(object);
}
}
客户端发送消息后,调用自定义序列化程序,但内容始终为空。我不知道为什么......序列化程序需要大量时间从流中读取所有字节,最后它是空的。这个过程一直在重复,所以我想我不小心建立了一个无限循环...
更新 2
我已经捕获了客户端和服务器套接字之间的通信: 看起来我卡在了握手过程中,因此没有有效负载...
因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。
提前致谢!
您如何与该服务器通信?默认情况下,连接工厂配置为要求输入由 CRLF 终止(例如 Telnet)。如果您的客户端使用其他东西来指示消息结束,则必须配置不同的反序列化器。
另外,您的方法签名不正确;应该是:
public String handleMessage(byte[] message, MessageHeaders messageHeaders) {
String string = new String(message);
System.out.println(string);
return string.toUpperCase();
}
我使用 Telnet 时效果很好:
$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
FOO
^]
telnet> quit
Connection closed.
这是一个仅适用于 LF 的版本(例如 netcat):
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
return IntegrationFlows.from(Tcp.inboundGateway(
Tcp.netServer(1234)
.deserializer(TcpCodecs.lf())
.serializer(TcpCodecs.lf())))
.handle(serverSocketHandler::handleMessage)
.get();
}
$ nc localhost 1234
foo
FOO
^C
好吧,经过几天的分析和编码,我找到了使用 spring 集成处理 TCP 套接字通信的最佳解决方案。对于正在为同样的问题苦苦挣扎的其他开发人员。这是我到目前为止所做的。
这个 class 包含一个 - 对我来说 - 基于注释的 TCP 套接字连接配置
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;
/**
* Spring annotation based configuration
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Value("${socket.port}")
private int socketPort;
/**
* Reply messages are routed to the connection only if the reply contains the ip_connectionId header
* that was inserted into the original message by the connection factory.
*/
@MessagingGateway(defaultRequestChannel = "toTcp")
public interface Gateway {
void send(String message, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
serverCf.setSerializer(SERIALIZER);
serverCf.setDeserializer(SERIALIZER);
serverCf.setSoTcpNoDelay(true);
serverCf.setSoKeepAlive(true);
// serverCf.setSingleUse(true);
// final int soTimeout = 5000;
// serverCf.setSoTimeout(soTimeout);
return serverCf;
}
@Bean
public AbstractClientConnectionFactory clientCF() {
TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost", socketPort);
clientCf.setSerializer(SERIALIZER);
clientCf.setDeserializer(SERIALIZER);
clientCf.setSoTcpNoDelay(true);
clientCf.setSoKeepAlive(true);
// clientCf.setSingleUse(true);
// final int soTimeout = 5000;
// clientCf.setSoTimeout(soTimeout);
return clientCf;
}
@Bean
public TcpInboundGateway tcpInGate() {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(serverCF());
inGate.setRequestChannel(fromTcp());
inGate.setReplyChannel(toTcp());
return inGate;
}
@Bean
public TcpOutboundGateway tcpOutGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(clientCF());
outGate.setReplyChannel(toTcp());
return outGate;
}
此 class 包含自定义序列化器和反序列化器
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>, Deserializer<byte[]> {
@NotNull
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] message = new byte[0];
if (inputStream.available() > 0) {
message = inputStream.readAllBytes();
}
log.debug("Deserialized message {}", new String(message, StandardCharsets.UTF_8));
return message;
}
@Override
public void serialize(@NotNull byte[] message, OutputStream outputStream) throws IOException {
log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
outputStream.write(message);
outputStream.flush();
}
}
在接下来的 classes 中,您可以实现一些业务逻辑来处理传入...
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// TODO implement some buisiness logic here
return msg;
}
}
和外发消息。
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// implement some business logic here
return msg;
}
}
希望对您有所帮助。 ;-)