如何让我的 bytebuf 发送我的整个消息 Netty

how to get my bytebuf to send my entire message Netty

我大家。

我问你是因为Netty的解码器中的ByteBuf有问题

我想解码由服务器到达的消息,但 ByteBuf 无法正常工作。

问题是 ByteBuf 没有占用消息的所有字节。

我解释一下,我有一个长度为1221字节的消息(这是一个例子),但是缓冲区大小只有64字节。

当我尝试读取缓冲区的长度时,出现这样的错误:

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(117) + length(101) exceeds writerIndex(192): PooledUnsafeDirectByteBuf(ridx: 117, widx: 192, cap: 192)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:918) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.37.Final.jar:4.1.37.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

我认为 Netty 没有时间阅读所有内容并只发送部分消息,但如果我可以配置 Netty,我现在不会,因为他必须等到消息全部到达。

如果有人能帮助我,我很感激

如果需要帮助,我给你解码器的代码

    int length = buffer.readInt();
    int messageType = buffer.readInt();


        Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
        if (supplier == null) {
            LOGGER.debug("This message type isn't supported: {}", messageType);
        } else {
            ByteBuf data = buffer.readBytes(length);
            if (messageType != 6) {
                AbstractMessage message = supplier.get();
                message.read(data, version);
                list.add(message);
                LOGGER.debug("{}", message);
            }
        }
    }

}

消息格式如下: 4 字节的 MessageLength (int) 4 字节的 MessageType (int) n 字节的数据(MessageLength 大小)

我给你我用来解释的文档Here

您需要通过扩展 ByteToMessageDecoder 和缓冲来编写自己的解码器,直到您收到所有内容。由于这是 TCP,您可能会收到碎片化的字节,因此您需要自己 assemble 再次接收它。

像这样的东西应该可以工作:

class MyDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) {
        if (input.readableBytes() < 4) {
            // we need to have at least 4 bytes to read to be able to get the message length
            return;
        }
        int length = input.getInt(input.readerIndex());
        if (input.readableBytes() < 8 + length) {
            // ensure we have enough data so we can also read the message type and the whole message body
            return;
        }
        // skip the length now
        input.skipBytes(4);

        int messageType = input.readInt();

        Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
        if (supplier == null) {
            LOGGER.debug("This message type isn't supported: {}", messageType);
            input.skip(length);
        } else {
            if (messageType != 6) {
                ByteBuf data = buffer.readSlice(length);
                AbstractMessage message = supplier.get();
                message.read(data, version);
                list.add(message);
                LOGGER.debug("{}", message);
            }
        }
    }
}

非常感谢。 我进行了更改,但您的代码启发了我并解决了我的问题。 我以为我收到了片段消息,但我对缓冲区的了解不多。

如果这个post可以帮助其他人,我把更正后的代码放在下面

if (length == 0) {
        length = buffer.readInt();
        messageType = buffer.readInt();
    }

    if (buffer.writerIndex() < length + buffer.readerIndex()) {
        // ensure we have enough data so we can also read the message type and the whole message body
        return;
    }

    if (!(messageType == MessageType.HEARTBEAT_REQ.getValue() || messageType == MessageType.HEARTBEAT_CONF.getValue())) {
        LOGGER.debug("The message type is : {}", messageType);
    }

    Supplier<AbstractMessage> supplier = SUPPLIERS.get(messageType);
    if (supplier == null) {
        LOGGER.debug("This message type isn't supported: {}", messageType);
        buffer.skipBytes(length);
    } else {
        ByteBuf data = buffer.readSlice(length);
        AbstractMessage message = supplier.get();
        message.read(data, version);
        list.add(message);
        if (messageType != 6) {
            LOGGER.debug("{}", message);
        }
        length = 0;
        messageType = 0;
    }

请注意,length 和 messageType 变量现在在解码器中是静态的

private static int length = 0;
private static int messageType = 0;