spring TCP/IP 服务器的启动处理
spring boot handling for TCP/IP server
必须实现一个服务器来通过以太网连接处理以下协议:
Establishing a connection
The client connects to the configured server via TCP / IP.
After the connection has been established, the client initially sends a heartbeat message to the
Server:
{
"MessageID": "Heartbeat"
}
Response:
{
"ResponseCode": "Ok"
}
Communication process
To maintain the connection, the client sends every 10 seconds when inactive
Heartbeat message.
Server and client must close the connection if they are not receiving a message for longer than 20 seconds.
An answer must be given within 5 seconds to request.
If no response is received, the connection must also be closed.
The protocol does not contain numbering or any other form of identification.
Communication partner when sending the responses makes sure that they are in the same sequence.
Message structure:
The messages are embedded in an STX-ETX frame.
STX (0x02) message ETX (0x03)
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}"
不应该只使用心跳消息。典型的消息应该是这样的:
{
"MessageID": "CheckAccess"
"Parameters": {
"MediaType": "type",
"MediaData": "data"
}
}
以及适当的回应:
{
"ResponseCode": "some-code",
"DisplayMessage": "some-message",
"SessionID": "some-id"
}
应该是多客户端服务器。而且协议没有任何标识。
但是,我们必须至少识别客户端发送它的 IP 地址。
找不到关于如何将此类服务器添加到 Spring 引导应用程序并在启动时启用并为其处理输入和输出逻辑的解决方案。
非常感谢任何建议。
解决方案
为 TCP 服务器配置如下:
@Slf4j
@Component
@RequiredArgsConstructor
public class TCPServer {
private final InetSocketAddress hostAddress;
private final ServerBootstrap serverBootstrap;
private Channel serverChannel;
@PostConstruct
public void start() {
try {
ChannelFuture serverChannelFuture = serverBootstrap.bind(hostAddress).sync();
log.info("Server is STARTED : port {}", hostAddress.getPort());
serverChannel = serverChannelFuture.channel().closeFuture().sync().channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
serverChannel.parent().close();
}
}
}
@PostConstruct
在应用程序启动期间启动服务器。
它的配置也是:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(NettyProperties.class)
public class NettyConfiguration {
private final LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
private final NettyProperties nettyProperties;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap(SimpleChannelInitializer initializer) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(loggingHandler)
.childHandler(initializer);
bootstrap.option(ChannelOption.SO_BACKLOG, nettyProperties.getBacklog());
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, nettyProperties.isKeepAlive());
return bootstrap;
}
@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(nettyProperties.getBossCount());
}
@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(nettyProperties.getWorkerCount());
}
@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
return new InetSocketAddress(nettyProperties.getTcpPort());
}
}
初始化逻辑:
@Component
@RequiredArgsConstructor
public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
属性配置:
@Getter
@Setter
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {
@NotNull
@Size(min = 1000, max = 65535)
private int tcpPort;
@Min(1)
@NotNull
private int bossCount;
@Min(2)
@NotNull
private int workerCount;
@NotNull
private boolean keepAlive;
@NotNull
private int backlog;
@NotNull
private int clientTimeout;
}
以及来自 application.yml
的片段:
netty:
tcp-port: 9090
boss-count: 1
worker-count: 14
keep-alive: true
backlog: 128
client-timeout: 20
而且处理程序非常简单。
运行 在控制台本地检查:
telnet localhost 9090
在那里工作得很好。希望客户端访问没问题
由于该协议不是基于 HTTP(不像 WebSocket 首先搭载在 HTTP 上),您唯一的选择是自己使用 TCP 服务器 并将其连接到 spring 上下文中以充分利用 spring 以及.
Netty 以 low-level TCP/IP 通信而闻名,很容易将 Netty 服务器封装在 spring 应用程序中。
事实上,spring boot 提供了开箱即用的 Netty HTTP server 但是 这不是你需要的.
TCP communication server with Netty And SpringBoot 项目是您需要的一个简单有效的示例。
看看这个项目的 TCPServer,它使用 Netty 的 ServerBootstrap 来启动自定义 TCP 服务器。
一旦你有了服务器,你可以连接 Netty 编解码器 或 Jackson 或您认为适合您的应用程序域数据的任何其他消息转换器 marshalling/unmarshalling。
[更新 - 2020 年 7 月 17 日]
针对问题的更新理解(HTTP 和 TCP 请求都在同一端点终止),以下是更新的解决方案建议
----> HTTP Server (be_http)
|
----> HAProxy -
|
----> TCP Server (be_tcp)
此解决方案需要以下 changes/additions 个条件:
- 在您现有的 spring 启动应用程序中添加基于 Netty 的侦听器,或者为 TCP 服务器创建一个单独的 spring 启动应用程序。假设此端点正在侦听端口 9090
上的 TCP 流量
- 将 HAProxy 添加为入口流量的终止端点
- 配置 HAProxy,使其将所有 HTTP 流量发送到端口 8080
上现有的 spring 启动 HTTP 端点(称为 be_http)
- 配置 HAProxy,以便所有非 HTTP 流量都发送到端口 9090 上的新 TCP spring 引导端点(称为 be_tcp)。
以下 HAProxy 配置就足够了。这些是与此问题相关的摘录,请添加适用于正常 HAProxy 设置的其他 HAProxy 指令:
listen 443
mode tcp
bind :443 name tcpsvr
/* add other regular directives */
tcp-request inspect-delay 1s
tcp-request content accept if HTTP
tcp-request content accept if !HTTP
use-server be_http if HTTP
use-server be_tcp if !HTTP
/* backend server definition */
server be_http 127.0.0.1:8080
server be_tcp 127.0.0.1:9090 send-proxy
以下 HAProxy 文档链接特别有用
- Fetching samples from buffer contents - Layer 6
- Pre-defined ACLs
- tcp-request inspect-delay
- tcp-request content
我个人会玩弄验证tcp-requestinspect-delay并根据实际需要调整因为这有可能在最坏的情况下增加请求延迟,即已建立连接但尚无可用内容来评估请求是否为 HTTP。
为了满足 的需要,我们必须至少识别客户端发送它的 IP 地址 ,您可以选择在将其发送回后端时使用 Proxy Protocol。我已经更新了上面的示例配置以在 be_tcp 中包含代理协议(添加 send_proxy)。我还从 be_http 中删除了 send_proxy,因为 spring 引导不需要它,相反,您可能会依赖常规 X-Forwarded-For header 来 be_http 后端。
在 be_tcp 后端中,您可以使用 Netty 的 HAProxyMessage to get the actual source IP address using sourceAddress() API。总而言之,这是一个可行的解决方案。我自己使用了带有代理协议的 HAProxy(在前端和后端)并且它对于工作来说更加稳定。
必须实现一个服务器来通过以太网连接处理以下协议:
Establishing a connection
The client connects to the configured server via TCP / IP.
After the connection has been established, the client initially sends a heartbeat message to the
Server:
{
"MessageID": "Heartbeat"
}
Response:
{
"ResponseCode": "Ok"
}
Communication process
To maintain the connection, the client sends every 10 seconds when inactive
Heartbeat message.
Server and client must close the connection if they are not receiving a message for longer than 20 seconds.
An answer must be given within 5 seconds to request.
If no response is received, the connection must also be closed.
The protocol does not contain numbering or any other form of identification.
Communication partner when sending the responses makes sure that they are in the same sequence.
Message structure:
The messages are embedded in an STX-ETX frame.
STX (0x02) message ETX (0x03)
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}"
不应该只使用心跳消息。典型的消息应该是这样的:
{
"MessageID": "CheckAccess"
"Parameters": {
"MediaType": "type",
"MediaData": "data"
}
}
以及适当的回应:
{
"ResponseCode": "some-code",
"DisplayMessage": "some-message",
"SessionID": "some-id"
}
应该是多客户端服务器。而且协议没有任何标识。
但是,我们必须至少识别客户端发送它的 IP 地址。
找不到关于如何将此类服务器添加到 Spring 引导应用程序并在启动时启用并为其处理输入和输出逻辑的解决方案。
非常感谢任何建议。
解决方案
为 TCP 服务器配置如下:
@Slf4j
@Component
@RequiredArgsConstructor
public class TCPServer {
private final InetSocketAddress hostAddress;
private final ServerBootstrap serverBootstrap;
private Channel serverChannel;
@PostConstruct
public void start() {
try {
ChannelFuture serverChannelFuture = serverBootstrap.bind(hostAddress).sync();
log.info("Server is STARTED : port {}", hostAddress.getPort());
serverChannel = serverChannelFuture.channel().closeFuture().sync().channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
serverChannel.parent().close();
}
}
}
@PostConstruct
在应用程序启动期间启动服务器。
它的配置也是:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(NettyProperties.class)
public class NettyConfiguration {
private final LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
private final NettyProperties nettyProperties;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap(SimpleChannelInitializer initializer) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(loggingHandler)
.childHandler(initializer);
bootstrap.option(ChannelOption.SO_BACKLOG, nettyProperties.getBacklog());
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, nettyProperties.isKeepAlive());
return bootstrap;
}
@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(nettyProperties.getBossCount());
}
@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(nettyProperties.getWorkerCount());
}
@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
return new InetSocketAddress(nettyProperties.getTcpPort());
}
}
初始化逻辑:
@Component
@RequiredArgsConstructor
public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
属性配置:
@Getter
@Setter
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {
@NotNull
@Size(min = 1000, max = 65535)
private int tcpPort;
@Min(1)
@NotNull
private int bossCount;
@Min(2)
@NotNull
private int workerCount;
@NotNull
private boolean keepAlive;
@NotNull
private int backlog;
@NotNull
private int clientTimeout;
}
以及来自 application.yml
的片段:
netty:
tcp-port: 9090
boss-count: 1
worker-count: 14
keep-alive: true
backlog: 128
client-timeout: 20
而且处理程序非常简单。
运行 在控制台本地检查:
telnet localhost 9090
在那里工作得很好。希望客户端访问没问题
由于该协议不是基于 HTTP(不像 WebSocket 首先搭载在 HTTP 上),您唯一的选择是自己使用 TCP 服务器 并将其连接到 spring 上下文中以充分利用 spring 以及.
Netty 以 low-level TCP/IP 通信而闻名,很容易将 Netty 服务器封装在 spring 应用程序中。
事实上,spring boot 提供了开箱即用的 Netty HTTP server 但是 这不是你需要的.
TCP communication server with Netty And SpringBoot 项目是您需要的一个简单有效的示例。
看看这个项目的 TCPServer,它使用 Netty 的 ServerBootstrap 来启动自定义 TCP 服务器。
一旦你有了服务器,你可以连接 Netty 编解码器 或 Jackson 或您认为适合您的应用程序域数据的任何其他消息转换器 marshalling/unmarshalling。
[更新 - 2020 年 7 月 17 日]
针对问题的更新理解(HTTP 和 TCP 请求都在同一端点终止),以下是更新的解决方案建议
----> HTTP Server (be_http) | ----> HAProxy - | ----> TCP Server (be_tcp)
此解决方案需要以下 changes/additions 个条件:
- 在您现有的 spring 启动应用程序中添加基于 Netty 的侦听器,或者为 TCP 服务器创建一个单独的 spring 启动应用程序。假设此端点正在侦听端口 9090 上的 TCP 流量
- 将 HAProxy 添加为入口流量的终止端点
- 配置 HAProxy,使其将所有 HTTP 流量发送到端口 8080 上现有的 spring 启动 HTTP 端点(称为 be_http)
- 配置 HAProxy,以便所有非 HTTP 流量都发送到端口 9090 上的新 TCP spring 引导端点(称为 be_tcp)。
以下 HAProxy 配置就足够了。这些是与此问题相关的摘录,请添加适用于正常 HAProxy 设置的其他 HAProxy 指令:
listen 443 mode tcp bind :443 name tcpsvr /* add other regular directives */ tcp-request inspect-delay 1s tcp-request content accept if HTTP tcp-request content accept if !HTTP use-server be_http if HTTP use-server be_tcp if !HTTP /* backend server definition */ server be_http 127.0.0.1:8080 server be_tcp 127.0.0.1:9090 send-proxy
以下 HAProxy 文档链接特别有用
- Fetching samples from buffer contents - Layer 6
- Pre-defined ACLs
- tcp-request inspect-delay
- tcp-request content
我个人会玩弄验证tcp-requestinspect-delay并根据实际需要调整因为这有可能在最坏的情况下增加请求延迟,即已建立连接但尚无可用内容来评估请求是否为 HTTP。
为了满足 的需要,我们必须至少识别客户端发送它的 IP 地址 ,您可以选择在将其发送回后端时使用 Proxy Protocol。我已经更新了上面的示例配置以在 be_tcp 中包含代理协议(添加 send_proxy)。我还从 be_http 中删除了 send_proxy,因为 spring 引导不需要它,相反,您可能会依赖常规 X-Forwarded-For header 来 be_http 后端。
在 be_tcp 后端中,您可以使用 Netty 的 HAProxyMessage to get the actual source IP address using sourceAddress() API。总而言之,这是一个可行的解决方案。我自己使用了带有代理协议的 HAProxy(在前端和后端)并且它对于工作来说更加稳定。