libuv 分配的内存缓冲区重用技术
libuv allocated memory buffers re-use techniques
我正在为我的广泛网络交互应用程序使用 libuv,我担心哪些重用分配内存的技术在 libuv 回调延迟执行的情况下会同时有效和安全。
在非常基础的一层,暴露给 libuv 用户,需要在设置句柄的同时指定缓冲区分配回调 reader:
UV_EXTERN int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
其中 uv_alloc_cb
是
typedef void (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
但这就是问题所在:每次通过句柄传入新消息(例如,接收到来自 uv_udp_t
句柄的每个 UDP 数据报)和直接分配新缓冲区时都会调用此内存分配回调对于每个传入的 UDP 数据报,似乎都非常不适合内存。
所以我要求一种常见的 C 技术(可能是在 libuv 回调系统引入的延迟执行上下文中)尽可能重复使用相同的分配内存。
此外,如果可能的话,我想windows-便携。
备注:
- 我知道这个问题:Does libuv provide any facilities to attach a buffer to a connection and re use it;除了说明静态分配的缓冲区是行不通的事实之外,它被接受的答案没有回答如何使用 libuv 实际进行内存分配。特别是,它没有涵盖附加到句柄的缓冲区(通过包装器结构或句柄->数据上下文)。
阅读 http://nikhilm.github.io/uvbook/filesystem.html ,我注意到剪辑 uvtee/main.c - Write to pipe
下的以下短语:
We make a copy so we can free the two buffers from the two calls to write_data independently of each other. While acceptable for a demo program like this, you’ll probably want smarter memory management, like reference counted buffers or a pool of buffers in any major application.
但我找不到任何涉及 libuv 缓冲区引用计数的解决方案(如何正确执行?)或 libuv 环境中缓冲区池的明确示例(是否有任何库?)。
我想分享一下我自己解决这个问题的经验。我能感受到您的痛苦和困惑,但实际上,如果您知道自己在做什么,考虑到您拥有的大量选择,实施一个可行的解决方案并不过分困难。
Objective
实现一个能够执行两个操作的缓冲区池 - acquire 和 release.
基本池化策略:
- acquire 从池中提取缓冲区,有效地将可用缓冲区数减少 1;
- 如果没有可用的缓冲区,则会出现两种选择:
- 增加池和return新创建的缓冲区;或
- 创建并return一个虚拟缓冲区(解释如下)。
- 释放 return将缓冲区返回到池中。
池的大小可以是固定的,也可以是可变的。 "Variable" 意味着最初有 M pre-allocated 个缓冲区(例如零),并且池可以根据需要增长到 N。 "Fixed" 表示所有缓冲区在创建池时都是 pre-allocated (M = N)。
实现一个为 libuv 获取缓冲区的回调。
除了 out-of-memory 情况外,在任何情况下都不允许无限池增长仍然具有池功能。
实施
现在,让我们更详细地了解一下这一切。
池结构:
#define BUFPOOL_CAPACITY 100
typedef struct bufpool_s bufpool_t;
struct bufpool_s {
void *bufs[BUFPOOL_CAPACITY];
int size;
};
size
是当前池大小。
缓冲区本身是一个内存块,前缀为以下结构:
#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)
typedef struct bufbase_s bufbase_t;
struct bufbase_s {
bufpool_t *pool;
int len;
};
len
是以字节为单位的缓冲区长度。
新缓冲区的分配如下所示:
void *bufpool_alloc(bufpool_t *pool, int len) {
bufbase_t *base = malloc(sizeof(bufbase_t) + len);
if (!base) return 0;
base->pool = pool;
base->len = len;
return (char *)base + sizeof(bufbase_t);
}
注意returned指针指向header之后的下一个字节——数据区。这允许拥有缓冲区指针,就好像它们是通过对 malloc
.
的标准调用分配的一样
解除分配正好相反:
void bufpool_free(void *ptr) {
if (!ptr) return;
free(bufbase(ptr));
}
libuv 的分配回调如下所示:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len;
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
你可以在这里看到,alloc_cb
在循环中从用户数据指针中获取缓冲池的指针。这意味着缓冲池应该在使用前附加到事件循环。换句话说,您应该在创建循环时初始化池并将其指针分配给 data
字段。如果您已经在该字段中保存了其他用户数据,只需扩展您的结构即可。
A 虚拟缓冲区 是一个假缓冲区,这意味着它不是源自池,但仍然可以正常运行。虚拟缓冲区的目的是让整个事情在池饥饿的罕见情况下工作,即当获得所有缓冲区并且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上都可以非常快速地分配大约 8Kb 的小内存块 - 这非常适合虚拟缓冲区的大小。
#define DUMMY_BUF_SIZE 8000
void *bufpool_dummy() {
return bufpool_alloc(0, DUMMY_BUF_SIZE);
}
获取操作:
void *bufpool_acquire(bufpool_t *pool, int *len) {
void *buf = bufpool_dequeue(pool);
if (!buf) buf = bufpool_dummy();
*len = buf ? buflen(buf) : 0;
return buf;
}
释放操作:
void bufpool_release(void *ptr) {
bufbase_t *base;
if (!ptr) return;
base = bufbase(ptr);
if (base->pool) bufpool_enqueue(base->pool, ptr);
else free(base);
}
这里有两个函数 - bufpool_enqueue
和 bufpool_dequeue
。基本上,他们执行池的所有工作。
在我的例子中,除了上面所说的之外还有一个 O(1) queue 的缓冲区索引,这使我能够更有效地跟踪池的状态,从而非常快速地获取缓冲区索引。没有必要像我那样走极端,因为池的最大大小是有限的,因此任何数组搜索在时间上也是恒定的。
在最简单的情况下,您可以在 bufpool_s
结构中的整个 bufs
数组中将这些函数实现为纯线性搜索器。例如,如果获取缓冲区,则搜索第一个 non-NULL 点,保存指针并将 NULL 放入该点。下次释放缓冲区时,您将搜索第一个 NULL 点并将其指针保存在那里。
池内部结构如下:
#define BUF_SIZE 64000
void *bufpool_grow(bufpool_t *pool) {
int idx = pool->size;
void *buf;
if (idx == BUFPOOL_CAPACITY) return 0;
buf = bufpool_alloc(pool, BUF_SIZE);
if (!buf) return 0;
pool->bufs[idx] = 0;
pool->size = idx + 1;
return buf;
}
void bufpool_enqueue(bufpool_t *pool, void *ptr) {
int idx;
for (idx = 0; idx < pool->size; ++idx) {
if (!pool->bufs[idx]) break;
}
assert(idx < pool->size);
pool->bufs[idx] = ptr;
}
void *bufpool_dequeue(bufpool_t *pool) {
int idx;
void *ptr;
for (idx = 0; idx < pool->size; ++idx) {
ptr = pool->bufs[idx];
if (ptr) {
pool->bufs[idx] = 0;
return ptr;
}
}
return bufpool_grow(pool);
}
正常的缓冲区大小是 64000 字节,因为我希望它能够舒适地放入一个 64Kb 的块中,其 header.
最后,初始化和 de-initialization 例程:
void bufpool_init(bufpool_t *pool) {
pool->size = 0;
}
void bufpool_done(bufpool_t *pool) {
int idx;
for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}
请注意,为便于说明,此实施已简化。这里没有池收缩策略,而在现实世界中,很可能需要它。
用法
您现在应该可以编写 libuv 回调了:
void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
/* ... */
bufpool_release(buf->base); /* Release the buffer */
}
循环初始化:
uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;
操作:
uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);
更新(2016 年 8 月 2 日)
根据请求的大小获取缓冲区时使用自适应策略也是一个好主意,并且仅当大块数据是return池缓冲区时请求(例如所有读取和长写入)。对于其他情况(例如大多数写入),return 虚拟缓冲区。这将有助于避免浪费池缓冲区保持可接受的分配速度。例如:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len = size; /* Requested buffer size */
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
void *bufpool_acquire(bufpool_t *pool, int *len) {
int size = *len;
if (size > DUMMY_BUF_SIZE) {
buf = bufpool_dequeue(pool);
if (buf) {
if (size > BUF_SIZE) *len = BUF_SIZE;
return buf;
}
size = DUMMY_BUF_SIZE;
}
buf = bufpool_alloc(0, size);
*len = buf ? size : 0;
return buf;
}
P.S。此代码段不需要 buflen
和 bufpool_dummy
。
如果您在 Linux,那您就走运了。 Linux 内核通常默认使用所谓的 SLAB Allocator。此分配器的优点是它通过维护可回收块池来减少实际内存分配。这对您来说意味着只要您始终分配相同大小的缓冲区(理想情况下 PAGE_SIZE 的 pow2 大小),您就可以在 Linux.[=14= 上使用 malloc()
]
如果您不在 Linux(或 FreeBSD 或 Solaris)上,或者如果您开发跨平台应用程序,您可以考虑使用 glib 及其 Memory Slices
SLAB 分配器的平台实现。它在支持它的平台上使用本机实现,因此在 Linux 上使用它不会带来任何优势(我 运行 自己进行了一些测试)。我敢肯定还有其他库可以做同样的事情,或者您可以自己实现。
让我们重复回调的函数签名:
void alloc_cb(uv_handle_t* handle, size_t, uv_buf_t*);
我设置 handle->data
指向一个 struct/pair/tuple 例如:
auto t(std::make_tuple(blah1, blah2, blah3));
这允许我与 cb 共享任意数据。我所做的是将 struct/pair/tuple 数据成员之一设置为我的缓冲区:
char data[65536];
那我就用cb中的buffer:
extern "C"
inline void uv_alloc_cb(uv_handle_t* const uvh, std::size_t const sz,
uv_buf_t* const buf) noexcept
{
auto const p(static_cast<std::pair<void*, char*>*>(uvh->data));
buf->base = std::get<1>(*p);
buf->len = 65536;
}
这超级快,不需要动态分配。我认为 libuv
API 有点像 ad-hoc,根本没有经过深思熟虑并且缺乏实施。为什么这个任意 64k 缓冲区要求?如果我不提供 64k,libuv
一点也不高兴,尽管它不会崩溃。
我正在为我的广泛网络交互应用程序使用 libuv,我担心哪些重用分配内存的技术在 libuv 回调延迟执行的情况下会同时有效和安全。
在非常基础的一层,暴露给 libuv 用户,需要在设置句柄的同时指定缓冲区分配回调 reader:
UV_EXTERN int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
其中 uv_alloc_cb
是
typedef void (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
但这就是问题所在:每次通过句柄传入新消息(例如,接收到来自 uv_udp_t
句柄的每个 UDP 数据报)和直接分配新缓冲区时都会调用此内存分配回调对于每个传入的 UDP 数据报,似乎都非常不适合内存。
所以我要求一种常见的 C 技术(可能是在 libuv 回调系统引入的延迟执行上下文中)尽可能重复使用相同的分配内存。
此外,如果可能的话,我想windows-便携。
备注:
- 我知道这个问题:Does libuv provide any facilities to attach a buffer to a connection and re use it;除了说明静态分配的缓冲区是行不通的事实之外,它被接受的答案没有回答如何使用 libuv 实际进行内存分配。特别是,它没有涵盖附加到句柄的缓冲区(通过包装器结构或句柄->数据上下文)。
阅读 http://nikhilm.github.io/uvbook/filesystem.html ,我注意到剪辑
uvtee/main.c - Write to pipe
下的以下短语:We make a copy so we can free the two buffers from the two calls to write_data independently of each other. While acceptable for a demo program like this, you’ll probably want smarter memory management, like reference counted buffers or a pool of buffers in any major application.
但我找不到任何涉及 libuv 缓冲区引用计数的解决方案(如何正确执行?)或 libuv 环境中缓冲区池的明确示例(是否有任何库?)。
我想分享一下我自己解决这个问题的经验。我能感受到您的痛苦和困惑,但实际上,如果您知道自己在做什么,考虑到您拥有的大量选择,实施一个可行的解决方案并不过分困难。
Objective
实现一个能够执行两个操作的缓冲区池 - acquire 和 release.
基本池化策略:
- acquire 从池中提取缓冲区,有效地将可用缓冲区数减少 1;
- 如果没有可用的缓冲区,则会出现两种选择:
- 增加池和return新创建的缓冲区;或
- 创建并return一个虚拟缓冲区(解释如下)。
- 释放 return将缓冲区返回到池中。
池的大小可以是固定的,也可以是可变的。 "Variable" 意味着最初有 M pre-allocated 个缓冲区(例如零),并且池可以根据需要增长到 N。 "Fixed" 表示所有缓冲区在创建池时都是 pre-allocated (M = N)。
实现一个为 libuv 获取缓冲区的回调。
除了 out-of-memory 情况外,在任何情况下都不允许无限池增长仍然具有池功能。
实施
现在,让我们更详细地了解一下这一切。
池结构:
#define BUFPOOL_CAPACITY 100
typedef struct bufpool_s bufpool_t;
struct bufpool_s {
void *bufs[BUFPOOL_CAPACITY];
int size;
};
size
是当前池大小。
缓冲区本身是一个内存块,前缀为以下结构:
#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)
typedef struct bufbase_s bufbase_t;
struct bufbase_s {
bufpool_t *pool;
int len;
};
len
是以字节为单位的缓冲区长度。
新缓冲区的分配如下所示:
void *bufpool_alloc(bufpool_t *pool, int len) {
bufbase_t *base = malloc(sizeof(bufbase_t) + len);
if (!base) return 0;
base->pool = pool;
base->len = len;
return (char *)base + sizeof(bufbase_t);
}
注意returned指针指向header之后的下一个字节——数据区。这允许拥有缓冲区指针,就好像它们是通过对 malloc
.
解除分配正好相反:
void bufpool_free(void *ptr) {
if (!ptr) return;
free(bufbase(ptr));
}
libuv 的分配回调如下所示:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len;
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
你可以在这里看到,alloc_cb
在循环中从用户数据指针中获取缓冲池的指针。这意味着缓冲池应该在使用前附加到事件循环。换句话说,您应该在创建循环时初始化池并将其指针分配给 data
字段。如果您已经在该字段中保存了其他用户数据,只需扩展您的结构即可。
A 虚拟缓冲区 是一个假缓冲区,这意味着它不是源自池,但仍然可以正常运行。虚拟缓冲区的目的是让整个事情在池饥饿的罕见情况下工作,即当获得所有缓冲区并且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上都可以非常快速地分配大约 8Kb 的小内存块 - 这非常适合虚拟缓冲区的大小。
#define DUMMY_BUF_SIZE 8000
void *bufpool_dummy() {
return bufpool_alloc(0, DUMMY_BUF_SIZE);
}
获取操作:
void *bufpool_acquire(bufpool_t *pool, int *len) {
void *buf = bufpool_dequeue(pool);
if (!buf) buf = bufpool_dummy();
*len = buf ? buflen(buf) : 0;
return buf;
}
释放操作:
void bufpool_release(void *ptr) {
bufbase_t *base;
if (!ptr) return;
base = bufbase(ptr);
if (base->pool) bufpool_enqueue(base->pool, ptr);
else free(base);
}
这里有两个函数 - bufpool_enqueue
和 bufpool_dequeue
。基本上,他们执行池的所有工作。
在我的例子中,除了上面所说的之外还有一个 O(1) queue 的缓冲区索引,这使我能够更有效地跟踪池的状态,从而非常快速地获取缓冲区索引。没有必要像我那样走极端,因为池的最大大小是有限的,因此任何数组搜索在时间上也是恒定的。
在最简单的情况下,您可以在 bufpool_s
结构中的整个 bufs
数组中将这些函数实现为纯线性搜索器。例如,如果获取缓冲区,则搜索第一个 non-NULL 点,保存指针并将 NULL 放入该点。下次释放缓冲区时,您将搜索第一个 NULL 点并将其指针保存在那里。
池内部结构如下:
#define BUF_SIZE 64000
void *bufpool_grow(bufpool_t *pool) {
int idx = pool->size;
void *buf;
if (idx == BUFPOOL_CAPACITY) return 0;
buf = bufpool_alloc(pool, BUF_SIZE);
if (!buf) return 0;
pool->bufs[idx] = 0;
pool->size = idx + 1;
return buf;
}
void bufpool_enqueue(bufpool_t *pool, void *ptr) {
int idx;
for (idx = 0; idx < pool->size; ++idx) {
if (!pool->bufs[idx]) break;
}
assert(idx < pool->size);
pool->bufs[idx] = ptr;
}
void *bufpool_dequeue(bufpool_t *pool) {
int idx;
void *ptr;
for (idx = 0; idx < pool->size; ++idx) {
ptr = pool->bufs[idx];
if (ptr) {
pool->bufs[idx] = 0;
return ptr;
}
}
return bufpool_grow(pool);
}
正常的缓冲区大小是 64000 字节,因为我希望它能够舒适地放入一个 64Kb 的块中,其 header.
最后,初始化和 de-initialization 例程:
void bufpool_init(bufpool_t *pool) {
pool->size = 0;
}
void bufpool_done(bufpool_t *pool) {
int idx;
for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}
请注意,为便于说明,此实施已简化。这里没有池收缩策略,而在现实世界中,很可能需要它。
用法
您现在应该可以编写 libuv 回调了:
void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
/* ... */
bufpool_release(buf->base); /* Release the buffer */
}
循环初始化:
uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;
操作:
uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);
更新(2016 年 8 月 2 日)
根据请求的大小获取缓冲区时使用自适应策略也是一个好主意,并且仅当大块数据是return池缓冲区时请求(例如所有读取和长写入)。对于其他情况(例如大多数写入),return 虚拟缓冲区。这将有助于避免浪费池缓冲区保持可接受的分配速度。例如:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len = size; /* Requested buffer size */
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
void *bufpool_acquire(bufpool_t *pool, int *len) {
int size = *len;
if (size > DUMMY_BUF_SIZE) {
buf = bufpool_dequeue(pool);
if (buf) {
if (size > BUF_SIZE) *len = BUF_SIZE;
return buf;
}
size = DUMMY_BUF_SIZE;
}
buf = bufpool_alloc(0, size);
*len = buf ? size : 0;
return buf;
}
P.S。此代码段不需要 buflen
和 bufpool_dummy
。
如果您在 Linux,那您就走运了。 Linux 内核通常默认使用所谓的 SLAB Allocator。此分配器的优点是它通过维护可回收块池来减少实际内存分配。这对您来说意味着只要您始终分配相同大小的缓冲区(理想情况下 PAGE_SIZE 的 pow2 大小),您就可以在 Linux.[=14= 上使用 malloc()
]
如果您不在 Linux(或 FreeBSD 或 Solaris)上,或者如果您开发跨平台应用程序,您可以考虑使用 glib 及其 Memory Slices
SLAB 分配器的平台实现。它在支持它的平台上使用本机实现,因此在 Linux 上使用它不会带来任何优势(我 运行 自己进行了一些测试)。我敢肯定还有其他库可以做同样的事情,或者您可以自己实现。
让我们重复回调的函数签名:
void alloc_cb(uv_handle_t* handle, size_t, uv_buf_t*);
我设置 handle->data
指向一个 struct/pair/tuple 例如:
auto t(std::make_tuple(blah1, blah2, blah3));
这允许我与 cb 共享任意数据。我所做的是将 struct/pair/tuple 数据成员之一设置为我的缓冲区:
char data[65536];
那我就用cb中的buffer:
extern "C"
inline void uv_alloc_cb(uv_handle_t* const uvh, std::size_t const sz,
uv_buf_t* const buf) noexcept
{
auto const p(static_cast<std::pair<void*, char*>*>(uvh->data));
buf->base = std::get<1>(*p);
buf->len = 65536;
}
这超级快,不需要动态分配。我认为 libuv
API 有点像 ad-hoc,根本没有经过深思熟虑并且缺乏实施。为什么这个任意 64k 缓冲区要求?如果我不提供 64k,libuv
一点也不高兴,尽管它不会崩溃。