netty包死循环发生
netty packet infinite loop occur
项目设置netty 4.1.65final + kotlin 1.5 + spring boot 2.5
服务器部署环境使用的是kubernets,dockerbaseimage使用的是alpine-adopt-openjdk-11.0.11.
重启Netty服务器的过程中,tcp包无限进入
是同一个netty通道发生的,我在通道上查看remoteAddress中的端口,连接到服务器浮动的地方,搜索“netstat-an”,找不到端口号.
当相同的数据包进入时,我尝试关闭通道,但通道没有关闭。
我认为在 netty 中的特定通道上存在无限循环,那是为什么?
在我的服务器日志下方
{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.393Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}
额外
用 LengthFieldBasedFrameDecoder 替换解码器没有引起任何问题。
当ReplayingDecoder完成接收数据包时,它应该清空buf,但我怀疑是没有清空造成的。
下面是我写的Replaying Decoder
enum class PacketDecoderState {
READ_HEADER, READ_WHOLE
}
@ExperimentalUnsignedTypes
class PacketReceiverHandler : ReplayingDecoder<PacketDecoderState>(PacketDecoderState.READ_HEADER) {
private var length: Int = 0
override fun decode(ctx: ChannelHandlerContext, buf: ByteBuf, out: MutableList<Any>) {
when (state()) {
PacketDecoderState.READ_HEADER -> {
println("READ HEAD :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name}")
val headerPacket = buf.readBytes(HeaderSize.TOTAL_HEAD_SIZE)
val startCode = ByteBufUtil.getBytes(headerPacket.readBytes(HeaderSize.START_CODE.length))
startCode.takeIf { !it.contentEquals(START_CODE) }?.let {
ctx.channel().close()
val array = ByteArray(buf.readableBytes())
buf.getBytes(buf.readerIndex(), array)
throw TcpPacketValidationCheckException(TcpError(errorCode = ErrorCode.P005, byteArray = array))
}
length = headerPacket.readBytes(HeaderSize.PACKET_LENGTH.length).getUnsignedShort(0)
checkpoint(PacketDecoderState.READ_WHOLE)
buf.resetReaderIndex()
}
PacketDecoderState.READ_WHOLE -> {
println("READ_WHOLE :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name} ")
val bodyPacket = buf.readBytes(length)
checkpoint(PacketDecoderState.READ_HEADER)
out.add(bodyPacket)
}
else -> throw ProtocolsException(AppError(ErrorCode.P000))
}
}
}
当有重复的数据包进来时,在读取头信息和重置读取索引时发生无限循环。
对于普通数据包
aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51ea
前10个字节是头信息。
但是当服务器重新启动时,数据包堆积在缓冲区中并立即进来。
aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51eaaaaa00244505f4000000754e672f4a78435562716b6d71755a636dea15[]
两个数据包一个接一个地堆积在ByteBuf中
ReplayingDecoder内部操作
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
while (in.isReadable()) 因为ByteBuf还有数据,所以保持运行.
读取我写的代码中的header信息,buf.ResetReaderIndex()初始化ByteBuf的读取索引为0,导致死循环
项目设置netty 4.1.65final + kotlin 1.5 + spring boot 2.5
服务器部署环境使用的是kubernets,dockerbaseimage使用的是alpine-adopt-openjdk-11.0.11.
重启Netty服务器的过程中,tcp包无限进入
是同一个netty通道发生的,我在通道上查看remoteAddress中的端口,连接到服务器浮动的地方,搜索“netstat-an”,找不到端口号.
当相同的数据包进入时,我尝试关闭通道,但通道没有关闭。
我认为在 netty 中的特定通道上存在无限循环,那是为什么?
在我的服务器日志下方
{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"}
{"log_level":"INFO","time":"2021-07-14T03:22:31.393Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}
额外
用 LengthFieldBasedFrameDecoder 替换解码器没有引起任何问题。
当ReplayingDecoder完成接收数据包时,它应该清空buf,但我怀疑是没有清空造成的。
下面是我写的Replaying Decoder
enum class PacketDecoderState {
READ_HEADER, READ_WHOLE
}
@ExperimentalUnsignedTypes
class PacketReceiverHandler : ReplayingDecoder<PacketDecoderState>(PacketDecoderState.READ_HEADER) {
private var length: Int = 0
override fun decode(ctx: ChannelHandlerContext, buf: ByteBuf, out: MutableList<Any>) {
when (state()) {
PacketDecoderState.READ_HEADER -> {
println("READ HEAD :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name}")
val headerPacket = buf.readBytes(HeaderSize.TOTAL_HEAD_SIZE)
val startCode = ByteBufUtil.getBytes(headerPacket.readBytes(HeaderSize.START_CODE.length))
startCode.takeIf { !it.contentEquals(START_CODE) }?.let {
ctx.channel().close()
val array = ByteArray(buf.readableBytes())
buf.getBytes(buf.readerIndex(), array)
throw TcpPacketValidationCheckException(TcpError(errorCode = ErrorCode.P005, byteArray = array))
}
length = headerPacket.readBytes(HeaderSize.PACKET_LENGTH.length).getUnsignedShort(0)
checkpoint(PacketDecoderState.READ_WHOLE)
buf.resetReaderIndex()
}
PacketDecoderState.READ_WHOLE -> {
println("READ_WHOLE :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name} ")
val bodyPacket = buf.readBytes(length)
checkpoint(PacketDecoderState.READ_HEADER)
out.add(bodyPacket)
}
else -> throw ProtocolsException(AppError(ErrorCode.P000))
}
}
}
当有重复的数据包进来时,在读取头信息和重置读取索引时发生无限循环。
对于普通数据包
aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51ea
前10个字节是头信息。
但是当服务器重新启动时,数据包堆积在缓冲区中并立即进来。 aaaa00244505f4000000754e672f4a78435562716b6d71755a636f3253496a673d3d51eaaaaa00244505f4000000754e672f4a78435562716b6d71755a636dea15[]
两个数据包一个接一个地堆积在ByteBuf中
ReplayingDecoder内部操作
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
while (in.isReadable()) 因为ByteBuf还有数据,所以保持运行.
读取我写的代码中的header信息,buf.ResetReaderIndex()初始化ByteBuf的读取索引为0,导致死循环