如何将字符串插入 Netty CompositeByteBuf 的中间

How to insert a string into the middle of a Netty CompositeByteBuf

我有一个 CompositeByteBuf,其中包含一些组成 HTTP 请求的缓冲区,我想在 HTTP 请求行之后立即注入一个额外的 HTTP header 字段。 (我不想使用整个 HTTP encoder/decoder,因为我只是代理数据,不需要将其全部解析为 HTTP)。

如何使用派生缓冲区执行此操作,从而避免复制 CompositeByteBuf 的内容。我使用 slice 和 readSlice 所做的每一次尝试都产生了 indexoutofbounds 错误或 Stack Overflow。谁能提出不需要复制整个 compositebytebuf 的替代方案?

/**
 * Injects an XFF header into pendingBuf 
 */
private void addXForwardedForHeaderToPendingBuf(
                 int pLFpos, 
              String pRemoteIPaddr)
{
    //create a new buffer 
    ByteBuf newBuf = inboundChannel.alloc().directBuffer(); 

    //add the HTTP request line to it
    ByteBufUtil.writeUtf8(newBuf, 
                          pendingBuf.readCharSequence(pLFpos + 1, 
                          CharsetUtil.UTF_8));

    //add the XFF header
    ByteBufUtil.writeUtf8(newBuf, "X-Forwarded-For: ");
    ByteBufUtil.writeUtf8(newBuf, pRemoteIPaddr);
    ByteBufUtil.writeUtf8(newBuf, "\r\n");

    //add anything from the original buffer that came after the request line
    int bytesRemaining = pendingBuf.readableBytes();
    if (bytesRemaining > 0)
    {
        newBuf.writeBytes(pendingBuf);
    }

    //clear pendingBuf
    pendingBuf.removeComponents(0, pendingBuf.numComponents()); 
    pendingBuf.setIndex(0, 0);

    //add newBuf into pendingBuf 
    pendingBuf.addComponent(newBuf);
    pendingBuf.writerIndex(pendingBuf.writerIndex() + newBuf.writerIndex()); 
}

虽然编辑当前 bytebuf 的缺点是在最坏的情况下所有字节都需要移动,但我们可以利用 CompositeByteBuf 具有我们可以根据需要编辑和移动的组件这一事实。

我们基本上要实现以下步骤:

  1. 因为一个CompositeByteBuf里面可能有多个Bytebuf,所以我们要查找我们要修改的buf的索引

    ByteBuf为我们提供了以下方法:

    遗憾的是,在字符串末尾插入的情况下,这些方法将无法正常工作,因为这在技术上超出了原始缓冲区的范围,我们需要为此添加一个特殊情况。

  2. 我们想要实现我们想要精确插入多个缓冲区之间的边界的特殊情况,因为在这些情况下我们实际上可以使用零拷贝。

  3. 如果拆分索引恰好落在一个bytebuf的中间,我们需要拆分它,并将其自身添加为2个单独的缓冲区。
  4. 我们需要更新合成器上的作者索引,

使用上述流程,我们可以创建以下代码:

public static void insertString(CompositeByteBuf buffer, int index, ByteBuf insertion) {
    try {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }
        if (insertion == null) {
            throw new NullPointerException("insertion");
        }
        if (buffer.readableBytes() < index) {
            throw new IllegalArgumentException("buffer.readableBytes() < index: "
                    + buffer.readableBytes() + " < " + index);
        }

        // Start by checking the offset where we need to inject the insertion
        int injectionBufOffset;
        int injectionByteOffset;
        if (index == buffer.readableBytes()) {
            injectionBufOffset = buffer.numComponents();
            injectionByteOffset = 0;
        } else {
            injectionBufOffset = buffer.toComponentIndex(index);
            injectionByteOffset = index - buffer.toByteIndex(injectionBufOffset);
        }

        // Optimalize in the case of offset 0
        if (injectionByteOffset == 0) {
            buffer.addComponent(injectionBufOffset, insertion.retain());
            buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
            return;
        }
        // Do the split technique
        ByteBuf toSplit = buffer.internalComponent(injectionBufOffset).retain();
        try {
            buffer.removeComponent(injectionBufOffset);
            buffer.addComponent(injectionBufOffset + 0,
                    toSplit.readSlice(injectionByteOffset).retain());
            buffer.addComponent(injectionBufOffset + 1, insertion.retain());
            buffer.addComponent(injectionBufOffset + 2,
                    toSplit.retain());
            buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
        } finally {
            ReferenceCountUtil.release(toSplit);
        }
    } finally {
        if (insertion != null) {
            ReferenceCountUtil.release(insertion);
        }
    }
}

由于这段代码相当复杂,我们也想确保它的测试正确,因此,我们需要一些单元测试(JUnit):

import static test.NettySplit.insertString;

public class NettySplitTest {

    CompositeByteBuf buffer;
    ByteBuf test;

    private void addByteBuf(CompositeByteBuf target, ByteBuf source) {
        target.addComponent(source);
        target.writerIndex(target.writerIndex() + source.readableBytes());
    }

    @Before
    public void before() {
        buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
    }

    @After
    public void after() {
        ReferenceCountUtil.release(buffer);
        buffer = null;
        ReferenceCountUtil.release(test);
        test = null;
    }

    @Test
    public void testSplitting() {
        addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));

        insertString(buffer, 2, Unpooled.wrappedBuffer(new byte[]{5}));

        test = Unpooled.wrappedBuffer(new byte[]{0, 1, 5, 2, 3});
        assertEquals(test, buffer);

    }

    @Test
    public void testInsertionStart() {
        addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));

        insertString(buffer, 0, Unpooled.wrappedBuffer(new byte[]{5}));

        test = Unpooled.wrappedBuffer(new byte[]{5, 0, 1, 2, 3});
        assertEquals(test, buffer);
    }

    @Test
    public void testInsertionEnd() {
        addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));

        insertString(buffer, 4, Unpooled.wrappedBuffer(new byte[]{5}));

        test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 5});
        assertEquals(test, buffer);
    }

    @Test
    public void testInsertionSplitEnd() {
        addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
        addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));

        insertString(buffer, 6, Unpooled.wrappedBuffer(new byte[]{5}));

        test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 0, 1, 5, 2, 3});
        assertEquals(test, buffer);
    }

}