如何让我的 bytebuf 发送我的整个消息 Netty
how to get my bytebuf to send my entire message Netty
我大家。
我问你是因为Netty的解码器中的ByteBuf有问题
我想解码由服务器到达的消息,但 ByteBuf 无法正常工作。
问题是 ByteBuf 没有占用消息的所有字节。
我解释一下,我有一个长度为1221字节的消息(这是一个例子),但是缓冲区大小只有64字节。
当我尝试读取缓冲区的长度时,出现这样的错误:
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(117) + length(101) exceeds writerIndex(192): PooledUnsafeDirectByteBuf(ridx: 117, widx: 192, cap: 192)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:918) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
我认为 Netty 没有时间阅读所有内容并只发送部分消息,但如果我可以配置 Netty,我现在不会,因为他必须等到消息全部到达。
如果有人能帮助我,我很感激
如果需要帮助,我给你解码器的代码
int length = buffer.readInt();
int messageType = buffer.readInt();
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
} else {
ByteBuf data = buffer.readBytes(length);
if (messageType != 6) {
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
LOGGER.debug("{}", message);
}
}
}
}
消息格式如下:
4 字节的 MessageLength (int)
4 字节的 MessageType (int)
n 字节的数据(MessageLength 大小)
我给你我用来解释的文档Here。
您需要通过扩展 ByteToMessageDecoder
和缓冲来编写自己的解码器,直到您收到所有内容。由于这是 TCP,您可能会收到碎片化的字节,因此您需要自己 assemble 再次接收它。
像这样的东西应该可以工作:
class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) {
if (input.readableBytes() < 4) {
// we need to have at least 4 bytes to read to be able to get the message length
return;
}
int length = input.getInt(input.readerIndex());
if (input.readableBytes() < 8 + length) {
// ensure we have enough data so we can also read the message type and the whole message body
return;
}
// skip the length now
input.skipBytes(4);
int messageType = input.readInt();
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
input.skip(length);
} else {
if (messageType != 6) {
ByteBuf data = buffer.readSlice(length);
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
LOGGER.debug("{}", message);
}
}
}
}
非常感谢。
我进行了更改,但您的代码启发了我并解决了我的问题。
我以为我收到了片段消息,但我对缓冲区的了解不多。
如果这个post可以帮助其他人,我把更正后的代码放在下面
if (length == 0) {
length = buffer.readInt();
messageType = buffer.readInt();
}
if (buffer.writerIndex() < length + buffer.readerIndex()) {
// ensure we have enough data so we can also read the message type and the whole message body
return;
}
if (!(messageType == MessageType.HEARTBEAT_REQ.getValue() || messageType == MessageType.HEARTBEAT_CONF.getValue())) {
LOGGER.debug("The message type is : {}", messageType);
}
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
buffer.skipBytes(length);
} else {
ByteBuf data = buffer.readSlice(length);
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
if (messageType != 6) {
LOGGER.debug("{}", message);
}
length = 0;
messageType = 0;
}
请注意,length 和 messageType 变量现在在解码器中是静态的
private static int length = 0;
private static int messageType = 0;
我大家。
我问你是因为Netty的解码器中的ByteBuf有问题
我想解码由服务器到达的消息,但 ByteBuf 无法正常工作。
问题是 ByteBuf 没有占用消息的所有字节。
我解释一下,我有一个长度为1221字节的消息(这是一个例子),但是缓冲区大小只有64字节。
当我尝试读取缓冲区的长度时,出现这样的错误:
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(117) + length(101) exceeds writerIndex(192): PooledUnsafeDirectByteBuf(ridx: 117, widx: 192, cap: 192)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:918) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
我认为 Netty 没有时间阅读所有内容并只发送部分消息,但如果我可以配置 Netty,我现在不会,因为他必须等到消息全部到达。
如果有人能帮助我,我很感激
如果需要帮助,我给你解码器的代码
int length = buffer.readInt();
int messageType = buffer.readInt();
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
} else {
ByteBuf data = buffer.readBytes(length);
if (messageType != 6) {
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
LOGGER.debug("{}", message);
}
}
}
}
消息格式如下: 4 字节的 MessageLength (int) 4 字节的 MessageType (int) n 字节的数据(MessageLength 大小)
我给你我用来解释的文档Here。
您需要通过扩展 ByteToMessageDecoder
和缓冲来编写自己的解码器,直到您收到所有内容。由于这是 TCP,您可能会收到碎片化的字节,因此您需要自己 assemble 再次接收它。
像这样的东西应该可以工作:
class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) {
if (input.readableBytes() < 4) {
// we need to have at least 4 bytes to read to be able to get the message length
return;
}
int length = input.getInt(input.readerIndex());
if (input.readableBytes() < 8 + length) {
// ensure we have enough data so we can also read the message type and the whole message body
return;
}
// skip the length now
input.skipBytes(4);
int messageType = input.readInt();
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
input.skip(length);
} else {
if (messageType != 6) {
ByteBuf data = buffer.readSlice(length);
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
LOGGER.debug("{}", message);
}
}
}
}
非常感谢。 我进行了更改,但您的代码启发了我并解决了我的问题。 我以为我收到了片段消息,但我对缓冲区的了解不多。
如果这个post可以帮助其他人,我把更正后的代码放在下面
if (length == 0) {
length = buffer.readInt();
messageType = buffer.readInt();
}
if (buffer.writerIndex() < length + buffer.readerIndex()) {
// ensure we have enough data so we can also read the message type and the whole message body
return;
}
if (!(messageType == MessageType.HEARTBEAT_REQ.getValue() || messageType == MessageType.HEARTBEAT_CONF.getValue())) {
LOGGER.debug("The message type is : {}", messageType);
}
Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
if (supplier == null) {
LOGGER.debug("This message type isn't supported: {}", messageType);
buffer.skipBytes(length);
} else {
ByteBuf data = buffer.readSlice(length);
AbstractMessage message = supplier.get();
message.read(data, version);
list.add(message);
if (messageType != 6) {
LOGGER.debug("{}", message);
}
length = 0;
messageType = 0;
}
请注意,length 和 messageType 变量现在在解码器中是静态的
private static int length = 0;
private static int messageType = 0;