如何将字符串插入 Netty CompositeByteBuf 的中间
How to insert a string into the middle of a Netty CompositeByteBuf
我有一个 CompositeByteBuf
,其中包含一些组成 HTTP 请求的缓冲区,我想在 HTTP 请求行之后立即注入一个额外的 HTTP header 字段。 (我不想使用整个 HTTP encoder/decoder,因为我只是代理数据,不需要将其全部解析为 HTTP)。
如何使用派生缓冲区执行此操作,从而避免复制 CompositeByteBuf
的内容。我使用 slice 和 readSlice 所做的每一次尝试都产生了 indexoutofbounds 错误或 Stack Overflow。谁能提出不需要复制整个 compositebytebuf 的替代方案?
/**
* Injects an XFF header into pendingBuf
*/
private void addXForwardedForHeaderToPendingBuf(
int pLFpos,
String pRemoteIPaddr)
{
//create a new buffer
ByteBuf newBuf = inboundChannel.alloc().directBuffer();
//add the HTTP request line to it
ByteBufUtil.writeUtf8(newBuf,
pendingBuf.readCharSequence(pLFpos + 1,
CharsetUtil.UTF_8));
//add the XFF header
ByteBufUtil.writeUtf8(newBuf, "X-Forwarded-For: ");
ByteBufUtil.writeUtf8(newBuf, pRemoteIPaddr);
ByteBufUtil.writeUtf8(newBuf, "\r\n");
//add anything from the original buffer that came after the request line
int bytesRemaining = pendingBuf.readableBytes();
if (bytesRemaining > 0)
{
newBuf.writeBytes(pendingBuf);
}
//clear pendingBuf
pendingBuf.removeComponents(0, pendingBuf.numComponents());
pendingBuf.setIndex(0, 0);
//add newBuf into pendingBuf
pendingBuf.addComponent(newBuf);
pendingBuf.writerIndex(pendingBuf.writerIndex() + newBuf.writerIndex());
}
虽然编辑当前 bytebuf 的缺点是在最坏的情况下所有字节都需要移动,但我们可以利用 CompositeByteBuf
具有我们可以根据需要编辑和移动的组件这一事实。
我们基本上要实现以下步骤:
因为一个CompositeByteBuf
里面可能有多个Bytebuf
,所以我们要查找我们要修改的buf的索引
ByteBuf
为我们提供了以下方法:
遗憾的是,在字符串末尾插入的情况下,这些方法将无法正常工作,因为这在技术上超出了原始缓冲区的范围,我们需要为此添加一个特殊情况。
我们想要实现我们想要精确插入多个缓冲区之间的边界的特殊情况,因为在这些情况下我们实际上可以使用零拷贝。
- 如果拆分索引恰好落在一个bytebuf的中间,我们需要拆分它,并将其自身添加为2个单独的缓冲区。
- 我们需要更新合成器上的作者索引,
使用上述流程,我们可以创建以下代码:
public static void insertString(CompositeByteBuf buffer, int index, ByteBuf insertion) {
try {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (insertion == null) {
throw new NullPointerException("insertion");
}
if (buffer.readableBytes() < index) {
throw new IllegalArgumentException("buffer.readableBytes() < index: "
+ buffer.readableBytes() + " < " + index);
}
// Start by checking the offset where we need to inject the insertion
int injectionBufOffset;
int injectionByteOffset;
if (index == buffer.readableBytes()) {
injectionBufOffset = buffer.numComponents();
injectionByteOffset = 0;
} else {
injectionBufOffset = buffer.toComponentIndex(index);
injectionByteOffset = index - buffer.toByteIndex(injectionBufOffset);
}
// Optimalize in the case of offset 0
if (injectionByteOffset == 0) {
buffer.addComponent(injectionBufOffset, insertion.retain());
buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
return;
}
// Do the split technique
ByteBuf toSplit = buffer.internalComponent(injectionBufOffset).retain();
try {
buffer.removeComponent(injectionBufOffset);
buffer.addComponent(injectionBufOffset + 0,
toSplit.readSlice(injectionByteOffset).retain());
buffer.addComponent(injectionBufOffset + 1, insertion.retain());
buffer.addComponent(injectionBufOffset + 2,
toSplit.retain());
buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
} finally {
ReferenceCountUtil.release(toSplit);
}
} finally {
if (insertion != null) {
ReferenceCountUtil.release(insertion);
}
}
}
由于这段代码相当复杂,我们也想确保它的测试正确,因此,我们需要一些单元测试(JUnit):
import static test.NettySplit.insertString;
public class NettySplitTest {
CompositeByteBuf buffer;
ByteBuf test;
private void addByteBuf(CompositeByteBuf target, ByteBuf source) {
target.addComponent(source);
target.writerIndex(target.writerIndex() + source.readableBytes());
}
@Before
public void before() {
buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
}
@After
public void after() {
ReferenceCountUtil.release(buffer);
buffer = null;
ReferenceCountUtil.release(test);
test = null;
}
@Test
public void testSplitting() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 2, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 5, 2, 3});
assertEquals(test, buffer);
}
@Test
public void testInsertionStart() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 0, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{5, 0, 1, 2, 3});
assertEquals(test, buffer);
}
@Test
public void testInsertionEnd() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 4, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 5});
assertEquals(test, buffer);
}
@Test
public void testInsertionSplitEnd() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 6, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 0, 1, 5, 2, 3});
assertEquals(test, buffer);
}
}
我有一个 CompositeByteBuf
,其中包含一些组成 HTTP 请求的缓冲区,我想在 HTTP 请求行之后立即注入一个额外的 HTTP header 字段。 (我不想使用整个 HTTP encoder/decoder,因为我只是代理数据,不需要将其全部解析为 HTTP)。
如何使用派生缓冲区执行此操作,从而避免复制 CompositeByteBuf
的内容。我使用 slice 和 readSlice 所做的每一次尝试都产生了 indexoutofbounds 错误或 Stack Overflow。谁能提出不需要复制整个 compositebytebuf 的替代方案?
/**
* Injects an XFF header into pendingBuf
*/
private void addXForwardedForHeaderToPendingBuf(
int pLFpos,
String pRemoteIPaddr)
{
//create a new buffer
ByteBuf newBuf = inboundChannel.alloc().directBuffer();
//add the HTTP request line to it
ByteBufUtil.writeUtf8(newBuf,
pendingBuf.readCharSequence(pLFpos + 1,
CharsetUtil.UTF_8));
//add the XFF header
ByteBufUtil.writeUtf8(newBuf, "X-Forwarded-For: ");
ByteBufUtil.writeUtf8(newBuf, pRemoteIPaddr);
ByteBufUtil.writeUtf8(newBuf, "\r\n");
//add anything from the original buffer that came after the request line
int bytesRemaining = pendingBuf.readableBytes();
if (bytesRemaining > 0)
{
newBuf.writeBytes(pendingBuf);
}
//clear pendingBuf
pendingBuf.removeComponents(0, pendingBuf.numComponents());
pendingBuf.setIndex(0, 0);
//add newBuf into pendingBuf
pendingBuf.addComponent(newBuf);
pendingBuf.writerIndex(pendingBuf.writerIndex() + newBuf.writerIndex());
}
虽然编辑当前 bytebuf 的缺点是在最坏的情况下所有字节都需要移动,但我们可以利用 CompositeByteBuf
具有我们可以根据需要编辑和移动的组件这一事实。
我们基本上要实现以下步骤:
因为一个
CompositeByteBuf
里面可能有多个Bytebuf
,所以我们要查找我们要修改的buf的索引ByteBuf
为我们提供了以下方法:遗憾的是,在字符串末尾插入的情况下,这些方法将无法正常工作,因为这在技术上超出了原始缓冲区的范围,我们需要为此添加一个特殊情况。
我们想要实现我们想要精确插入多个缓冲区之间的边界的特殊情况,因为在这些情况下我们实际上可以使用零拷贝。
- 如果拆分索引恰好落在一个bytebuf的中间,我们需要拆分它,并将其自身添加为2个单独的缓冲区。
- 我们需要更新合成器上的作者索引,
使用上述流程,我们可以创建以下代码:
public static void insertString(CompositeByteBuf buffer, int index, ByteBuf insertion) {
try {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (insertion == null) {
throw new NullPointerException("insertion");
}
if (buffer.readableBytes() < index) {
throw new IllegalArgumentException("buffer.readableBytes() < index: "
+ buffer.readableBytes() + " < " + index);
}
// Start by checking the offset where we need to inject the insertion
int injectionBufOffset;
int injectionByteOffset;
if (index == buffer.readableBytes()) {
injectionBufOffset = buffer.numComponents();
injectionByteOffset = 0;
} else {
injectionBufOffset = buffer.toComponentIndex(index);
injectionByteOffset = index - buffer.toByteIndex(injectionBufOffset);
}
// Optimalize in the case of offset 0
if (injectionByteOffset == 0) {
buffer.addComponent(injectionBufOffset, insertion.retain());
buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
return;
}
// Do the split technique
ByteBuf toSplit = buffer.internalComponent(injectionBufOffset).retain();
try {
buffer.removeComponent(injectionBufOffset);
buffer.addComponent(injectionBufOffset + 0,
toSplit.readSlice(injectionByteOffset).retain());
buffer.addComponent(injectionBufOffset + 1, insertion.retain());
buffer.addComponent(injectionBufOffset + 2,
toSplit.retain());
buffer.writerIndex(buffer.writerIndex() + insertion.readableBytes());
} finally {
ReferenceCountUtil.release(toSplit);
}
} finally {
if (insertion != null) {
ReferenceCountUtil.release(insertion);
}
}
}
由于这段代码相当复杂,我们也想确保它的测试正确,因此,我们需要一些单元测试(JUnit):
import static test.NettySplit.insertString;
public class NettySplitTest {
CompositeByteBuf buffer;
ByteBuf test;
private void addByteBuf(CompositeByteBuf target, ByteBuf source) {
target.addComponent(source);
target.writerIndex(target.writerIndex() + source.readableBytes());
}
@Before
public void before() {
buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
}
@After
public void after() {
ReferenceCountUtil.release(buffer);
buffer = null;
ReferenceCountUtil.release(test);
test = null;
}
@Test
public void testSplitting() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 2, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 5, 2, 3});
assertEquals(test, buffer);
}
@Test
public void testInsertionStart() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 0, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{5, 0, 1, 2, 3});
assertEquals(test, buffer);
}
@Test
public void testInsertionEnd() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 4, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 5});
assertEquals(test, buffer);
}
@Test
public void testInsertionSplitEnd() {
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
addByteBuf(buffer, Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3}));
insertString(buffer, 6, Unpooled.wrappedBuffer(new byte[]{5}));
test = Unpooled.wrappedBuffer(new byte[]{0, 1, 2, 3, 0, 1, 5, 2, 3});
assertEquals(test, buffer);
}
}