使用 Netty 处理 STX-ETX 帧

Handling STX-ETX frame with Netty

必须使用 Netty 实现以下 TCP/IP 协议实现:

Message structure:
The messages are embedded in an STX-ETX frame:

STX    MESSAGE      ETX
0x02   7b20224d...  0x03   

An `escaping` of STX and ETX within the message is not necessary since it is in JSON format

Escape sequence are following:

JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".

这里是关于 STX, ETX control codes 的更多信息。

消息的长度可能不同,它将具有 JSON 格式,类似于:

[=17=]x02{"messageID": "Heartbeat"}[=17=]x03

我的想法是将自定义框架定界符与 StringEncoder/StringDecoder 相结合。

对于自定义帧定界符 -> 使用 0x03 作为定界符并跳过第一个字节 (0x02)。

因此创建了以下 FrameDelimiterDecoder

@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {

    public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
        super(maxFrameLength, delimiter);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        ByteBuf buffFrame = null;

        Object frame = super.decode(ctx, buffer);
        if (frame instanceof ByteBuf) {
            buffFrame = (ByteBuf) frame;
        } else {
            log.info("frame: {}", frame);
        }

        if (buffFrame != null) {
            buffFrame.writeBytes(buffer.skipBytes(1));
        } else {
            log.warn("buffer is <null>");
        }

        return buffFrame;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage(), cause);
    }
}

并将其用于初始化:

@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final StringEncoder stringEncoder = new StringEncoder();
    private final StringDecoder stringDecoder = new StringDecoder();

    private final QrReaderProcessingHandler readerServerHandler;
    private final NettyProperties nettyProperties;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));

        if (nettyProperties.isEnableTimeout()) {
            pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
        }
        pipeline.addLast(stringDecoder);
        pipeline.addLast(stringEncoder);
        pipeline.addLast(readerServerHandler);
    }
}

然而,它总是失败:

c.s.netty.init.FrameDelimiterDecoder     : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)

无法理解那里缺少什么。

如何使用Netty处理request/response的STX-ETX帧?

经过多次尝试失败,解决了这个问题。

重新思考 FrameDelimiterDecoder 的代码,找到了如何使用字节数组并在最后转换为 ByteBuf 的方法。我相信可以直接使用缓冲区或使用 NIO 包中的 ByteBuffer 然后转换来完成。

对我来说最简单的是:

@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
    public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
        super(maxFrameLength, delimiter);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
        boolean inMessage = false;

        int size = buffer.readableBytes();

        ByteBuffer byteBuffer = ByteBuffer.allocate(size);
        buffer.readBytes(byteBuffer);

        byte[] byteArray = new byte[size - 2];
        byte[] data = byteBuffer.array();

        int index = 0;
        for (byte b : data) {
            if (b == FrameConstant.START_OF_TEXT) {
                if (!inMessage) {
                    inMessage = true;
                } else {
                    log.warn("Unexpected STX received!");
                }
            } else if (b == FrameConstant.END_OF_TEXT) {
                if (inMessage) {
                    inMessage = false;
                } else {
                    log.warn("Unexpected ETX received!");
                }
            } else {
                if (inMessage) {
                    byteArray[index] = b;
                    index += 1;
                }
            }
        }

        return Unpooled.wrappedBuffer(byteArray);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof InterruptedException) {
            log.warn("interrupted exception occurred");
            Thread.currentThread().interrupt();
        } else {
            log.error("FrameDelimiterEncoder exception occurred:", cause);
        }
    }
}

其中 FrameConstant 看起来像:

@UtilityClass
public class FrameConstant {
    public final int START_OF_TEXT = 0x02;
    public final int END_OF_TEXT = 0x03;

    public final int MAX_FRAME_LENGTH = 1024 * 1024;
}

然后初始化它:

@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
    private final StringEncoder stringEncoder = new StringEncoder();
    private final StringDecoder stringDecoder = new StringDecoder();

    private final QrReaderProcessingHandler readerServerHandler;
    private final NettyProperties nettyProperties;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // Add the delimiter first
        pipeline.addLast(getDelimiterDecoder());

        if (nettyProperties.isEnableTimeout()) {
            pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
        }
        pipeline.addLast(stringDecoder);
        pipeline.addLast(stringEncoder);
        pipeline.addLast(readerServerHandler);
    }

    private FrameDelimiterDecoder getDelimiterDecoder() {
        ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
        return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
    }
}

以及处理程序的一些修改:

@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
    private final PermissionService permissionService;
    private final EntranceService entranceService;
    private final Gson gson;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String remoteAddress = ctx.channel().remoteAddress().toString();
        String stringMsg = (String) msg;

        if (log.isDebugEnabled()) {
            log.debug("CLIENT_IP: {}", remoteAddress);
            log.debug("CLIENT_REQUEST: {}", stringMsg);
        }

        if (HEARTBEAT.containsName(stringMsg)) {
            HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
            sendResponse(ctx, heartbeatResponse);

        } 
    }

    private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
        ctx.writeAndFlush(formatResponse(response));
    }

    private <T> String formatResponse(T response) {
        String realResponse = String.format("%s%s%s",
                (char) FrameConstant.START_OF_TEXT,
                gson.toJson(response),
                (char) FrameConstant.END_OF_TEXT);

        log.debug("response: {}", realResponse);
        return realResponse;
    }

最后,它发送回格式正确的响应: