netty:如何就地编辑 bytebuf 的后端数组?

netty: How to edit the backend array of bytebuf in place?

在我的用例中,我需要在读取或写入之前对每个字节进行异或运算 到网络。所以我实现了一个双工处理程序来执行此操作。

但奇怪的是我无法就地编辑输出 bytebuf, 而阅读方可以:

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    ByteBuf res = buf.alloc().buffer(buf.readableBytes());
    buf.forEachByte(value -> {
        res.writeByte(value ^ cookie);
        return true;
    });
    buf.release();
    super.write(ctx, res, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    buf.forEachByte(new ByteProcessor() {
        private int i = 0;

        @Override
        public boolean process(byte value) throws Exception {
            buf.setByte(i++, value ^ cookie);
            return true;
        }
    });
    super.channelRead(ctx, msg);
}

注意write()的msg参数是在ByteBuf类型中保证的 它以前的处理程序。

如果我在 write() 中就地编辑 bytebuf,应用程序将无法正常工作 预期(我不知道会发生什么,我的应用程序是一个代理程序,如果我 就地编辑 buf,它不会工作:某些代理连接会 给出错误的内容),所以我不得不回退复制缓冲区进行编辑, 这是一种缓慢的方式,对吧?

write()中的bytebuf有什么特别之处吗?

编辑:

代理服务器成对处理两个通道,入站通道和出站通道,并在它们之间代理数据。

入站渠道流水线:

ch.pipeline().addLast("ReadTimeoutHandler", new ReadTimeoutHandler(30));
ch.pipeline().addLast("WriteTimeoutHandler", new WriteTimeoutHandler(30));
cookie.ifPresent(c -> ch.pipeline().addLast(new FuzzHandler(c)));
ch.pipeline().addLast(new CopyHandler());

出站通道流水线:

ch.pipeline().addLast(new CopyHandler());

CopyHandlerwrite() 配对频道。

一般来说应该是可以调整到位的。我怀疑您可能会看到问题,因为您多次写入相同的缓冲区(或共享相同存储空间的不同缓冲区)。

我发现了问题。

在我的代码的某些路径中,我 Unpooled.wrappedBuffer() 一个静态字节数组并执行 write(),所以如果出站处理程序中的 write() 直接修改缓冲区,它会改变byte数组,影响下一次复用,所以写入网络的内容有一半是错误的。

所以解决方案是复制字节数组:

ByteBuf buf = ctx.alloc().buffer(src.length);
buf.writeBytes(src);

顺便说一句,就地修改缓冲区的正确方法是什么? setByte() 应该使用 readerIndex 作为偏移量?

ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
buf.forEachByte(new ByteProcessor() {
    private int i = 0;

    @Override
    public boolean process(byte value) throws Exception {
        buf.setByte(readerIndex + i, value ^ cookie);
        i++;
        return true;
    }
});