netty包死循环发生

netty packet infinite loop occur

项目设置netty 4.1.65final + kotlin 1.5 + spring boot 2.5

服务器部署环境使用的是kubernets,dockerbaseimage使用的是alpine-adopt-openjdk-11.0.11.

重启Netty服务器的过程中,tcp包无限进入

是同一个netty通道发生的,我在通道上查看remoteAddress中的端口,连接到服务器浮动的地方,搜索“netstat-an”,找不到端口号.

当相同的数据包进入时,我尝试关闭通道,但通道没有关闭。

我认为在 netty 中的特定通道上存在无限循环,那是为什么?

在我的服务器日志下方

{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.393Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}

额外

用 LengthFieldBasedFrameDecoder 替换解码器没有引起任何问题。

当ReplayingDecoder完成接收数据包时,它应该清空buf,但我怀疑是没有清空造成的。

下面是我写的Replaying Decoder

enum class PacketDecoderState {
    READ_HEADER, READ_WHOLE
}

@ExperimentalUnsignedTypes
class PacketReceiverHandler : ReplayingDecoder<PacketDecoderState>(PacketDecoderState.READ_HEADER) {
    private var length: Int = 0
    override fun decode(ctx: ChannelHandlerContext, buf: ByteBuf, out: MutableList<Any>) {
        when (state()) {
            PacketDecoderState.READ_HEADER -> {
                println("READ HEAD :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name}")
                val headerPacket = buf.readBytes(HeaderSize.TOTAL_HEAD_SIZE)

                val startCode = ByteBufUtil.getBytes(headerPacket.readBytes(HeaderSize.START_CODE.length))
                startCode.takeIf { !it.contentEquals(START_CODE) }?.let {
                    ctx.channel().close()
                    val array = ByteArray(buf.readableBytes())
                    buf.getBytes(buf.readerIndex(), array)
                    throw TcpPacketValidationCheckException(TcpError(errorCode = ErrorCode.P005, byteArray = array))
                }
                length = headerPacket.readBytes(HeaderSize.PACKET_LENGTH.length).getUnsignedShort(0)
                checkpoint(PacketDecoderState.READ_WHOLE)

                buf.resetReaderIndex()
            }

            PacketDecoderState.READ_WHOLE -> {
                println("READ_WHOLE :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name} ")

                val bodyPacket = buf.readBytes(length)
                checkpoint(PacketDecoderState.READ_HEADER)
                out.add(bodyPacket)
            }
            else -> throw ProtocolsException(AppError(ErrorCode.P000))
        }
    }

}

当有重复的数据包进来时,在读取头信息和重置读取索引时发生无限循环。

对于普通数据包

aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51ea

前10个字节是头信息。

但是当服务器重新启动时,数据包堆积在缓冲区中并立即进来。 aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51eaaaaa00244505f4000000754e672f4a78435562716b6d71755a636dea15[]

两个数据包一个接一个地堆积在ByteBuf中

ReplayingDecoder内部操作

@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();

while (in.isReadable()) 因为ByteBuf还有数据,所以保持运行.

读取我写的代码中的header信息,buf.ResetReaderIndex()初始化ByteBuf的读取索引为0,导致死循环