使用 Netty 处理 STX-ETX 帧
Handling STX-ETX frame with Netty
必须使用 Netty 实现以下 TCP/IP 协议实现:
Message structure:
The messages are embedded in an STX-ETX frame:
STX MESSAGE ETX
0x02 7b20224d... 0x03
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".
这里是关于 STX, ETX control codes 的更多信息。
消息的长度可能不同,它将具有 JSON 格式,类似于:
[=17=]x02{"messageID": "Heartbeat"}[=17=]x03
我的想法是将自定义框架定界符与 StringEncoder/StringDecoder 相结合。
对于自定义帧定界符 -> 使用 0x03
作为定界符并跳过第一个字节 (0x02)。
因此创建了以下 FrameDelimiterDecoder
:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf buffFrame = null;
Object frame = super.decode(ctx, buffer);
if (frame instanceof ByteBuf) {
buffFrame = (ByteBuf) frame;
} else {
log.info("frame: {}", frame);
}
if (buffFrame != null) {
buffFrame.writeBytes(buffer.skipBytes(1));
} else {
log.warn("buffer is <null>");
}
return buffFrame;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
}
并将其用于初始化:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
然而,它总是失败:
c.s.netty.init.FrameDelimiterDecoder : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
无法理解那里缺少什么。
如何使用Netty处理request/response的STX-ETX帧?
经过多次尝试失败,解决了这个问题。
重新思考 FrameDelimiterDecoder
的代码,找到了如何使用字节数组并在最后转换为 ByteBuf
的方法。我相信可以直接使用缓冲区或使用 NIO 包中的 ByteBuffer 然后转换来完成。
对我来说最简单的是:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
boolean inMessage = false;
int size = buffer.readableBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
buffer.readBytes(byteBuffer);
byte[] byteArray = new byte[size - 2];
byte[] data = byteBuffer.array();
int index = 0;
for (byte b : data) {
if (b == FrameConstant.START_OF_TEXT) {
if (!inMessage) {
inMessage = true;
} else {
log.warn("Unexpected STX received!");
}
} else if (b == FrameConstant.END_OF_TEXT) {
if (inMessage) {
inMessage = false;
} else {
log.warn("Unexpected ETX received!");
}
} else {
if (inMessage) {
byteArray[index] = b;
index += 1;
}
}
}
return Unpooled.wrappedBuffer(byteArray);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof InterruptedException) {
log.warn("interrupted exception occurred");
Thread.currentThread().interrupt();
} else {
log.error("FrameDelimiterEncoder exception occurred:", cause);
}
}
}
其中 FrameConstant
看起来像:
@UtilityClass
public class FrameConstant {
public final int START_OF_TEXT = 0x02;
public final int END_OF_TEXT = 0x03;
public final int MAX_FRAME_LENGTH = 1024 * 1024;
}
然后初始化它:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the delimiter first
pipeline.addLast(getDelimiterDecoder());
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
private FrameDelimiterDecoder getDelimiterDecoder() {
ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
}
}
以及处理程序的一些修改:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final PermissionService permissionService;
private final EntranceService entranceService;
private final Gson gson;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String remoteAddress = ctx.channel().remoteAddress().toString();
String stringMsg = (String) msg;
if (log.isDebugEnabled()) {
log.debug("CLIENT_IP: {}", remoteAddress);
log.debug("CLIENT_REQUEST: {}", stringMsg);
}
if (HEARTBEAT.containsName(stringMsg)) {
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
sendResponse(ctx, heartbeatResponse);
}
}
private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
ctx.writeAndFlush(formatResponse(response));
}
private <T> String formatResponse(T response) {
String realResponse = String.format("%s%s%s",
(char) FrameConstant.START_OF_TEXT,
gson.toJson(response),
(char) FrameConstant.END_OF_TEXT);
log.debug("response: {}", realResponse);
return realResponse;
}
最后,它发送回格式正确的响应:
必须使用 Netty 实现以下 TCP/IP 协议实现:
Message structure:
The messages are embedded in an STX-ETX frame:
STX MESSAGE ETX
0x02 7b20224d... 0x03
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".
这里是关于 STX, ETX control codes 的更多信息。
消息的长度可能不同,它将具有 JSON 格式,类似于:
[=17=]x02{"messageID": "Heartbeat"}[=17=]x03
我的想法是将自定义框架定界符与 StringEncoder/StringDecoder 相结合。
对于自定义帧定界符 -> 使用 0x03
作为定界符并跳过第一个字节 (0x02)。
因此创建了以下 FrameDelimiterDecoder
:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf buffFrame = null;
Object frame = super.decode(ctx, buffer);
if (frame instanceof ByteBuf) {
buffFrame = (ByteBuf) frame;
} else {
log.info("frame: {}", frame);
}
if (buffFrame != null) {
buffFrame.writeBytes(buffer.skipBytes(1));
} else {
log.warn("buffer is <null>");
}
return buffFrame;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
}
并将其用于初始化:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
然而,它总是失败:
c.s.netty.init.FrameDelimiterDecoder : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
无法理解那里缺少什么。
如何使用Netty处理request/response的STX-ETX帧?
经过多次尝试失败,解决了这个问题。
重新思考 FrameDelimiterDecoder
的代码,找到了如何使用字节数组并在最后转换为 ByteBuf
的方法。我相信可以直接使用缓冲区或使用 NIO 包中的 ByteBuffer 然后转换来完成。
对我来说最简单的是:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
boolean inMessage = false;
int size = buffer.readableBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
buffer.readBytes(byteBuffer);
byte[] byteArray = new byte[size - 2];
byte[] data = byteBuffer.array();
int index = 0;
for (byte b : data) {
if (b == FrameConstant.START_OF_TEXT) {
if (!inMessage) {
inMessage = true;
} else {
log.warn("Unexpected STX received!");
}
} else if (b == FrameConstant.END_OF_TEXT) {
if (inMessage) {
inMessage = false;
} else {
log.warn("Unexpected ETX received!");
}
} else {
if (inMessage) {
byteArray[index] = b;
index += 1;
}
}
}
return Unpooled.wrappedBuffer(byteArray);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof InterruptedException) {
log.warn("interrupted exception occurred");
Thread.currentThread().interrupt();
} else {
log.error("FrameDelimiterEncoder exception occurred:", cause);
}
}
}
其中 FrameConstant
看起来像:
@UtilityClass
public class FrameConstant {
public final int START_OF_TEXT = 0x02;
public final int END_OF_TEXT = 0x03;
public final int MAX_FRAME_LENGTH = 1024 * 1024;
}
然后初始化它:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the delimiter first
pipeline.addLast(getDelimiterDecoder());
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
private FrameDelimiterDecoder getDelimiterDecoder() {
ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
}
}
以及处理程序的一些修改:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final PermissionService permissionService;
private final EntranceService entranceService;
private final Gson gson;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String remoteAddress = ctx.channel().remoteAddress().toString();
String stringMsg = (String) msg;
if (log.isDebugEnabled()) {
log.debug("CLIENT_IP: {}", remoteAddress);
log.debug("CLIENT_REQUEST: {}", stringMsg);
}
if (HEARTBEAT.containsName(stringMsg)) {
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
sendResponse(ctx, heartbeatResponse);
}
}
private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
ctx.writeAndFlush(formatResponse(response));
}
private <T> String formatResponse(T response) {
String realResponse = String.format("%s%s%s",
(char) FrameConstant.START_OF_TEXT,
gson.toJson(response),
(char) FrameConstant.END_OF_TEXT);
log.debug("response: {}", realResponse);
return realResponse;
}
最后,它发送回格式正确的响应: