C++ 中的高效循环缓冲区,将传递给 C 样式数组函数参数
Efficient circular buffer in C++ which will be passed to C-style array function parameter
我正在寻求有关我解决以下问题的方法的建议。我有一个恒定的数据输入需要添加到我的缓冲区,并且在每次迭代中,我需要将缓冲数据传递给一个函数,该函数通过指针接受 C 样式数组。
我担心效率问题,所以我思考如何在某种循环缓冲区中存储和管理数据,同时将其作为顺序原始数据传递给上述函数。
我目前的方法可以总结为以下示例:
#include <iostream>
#include <array>
#include <algorithm>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int size = 20;
std::array<double, size> buffer{};
for (double data = 0.0; data < 50.0; data += 1.0)
{
std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
buffer.back() = data;
foo(buffer.data(), size);
}
}
在实际用例中,缓冲区还需要在开始时填充到“const”大小的数据(我在这里使用引号是因为大小在编译时可能知道,也可能不知道,但是一旦众所周知,它永远不会改变)。
我将数据存储在 std::array
中(如果在编译时不知道大小,则存储在 std::vector
中)因为数据在内存中是顺序的。当我需要插入新数据时,我使用 forward std::move
移动所有内容,然后我手动替换最后一项。最后,我只是将 std::array::data()
及其大小传递给函数。
虽然乍一看这应该可以有效地工作,但理性告诉我,因为数据是顺序存储的,所以整个缓冲区仍然会用 std::move
复制,每次插入将是 O(n)
真正的缓冲区大小可能只有数百个,数据到达的频率最大为 100Hz,但问题是我需要尽快获得被调用函数的结果,所以我不想在缓冲区上浪费时间管理(即使我们说的很少,甚至比 ms 还少)。我对此有很多疑问,但他们的候选名单如下:
- 我的做法是不是太天真了?
- 我关于 O(n) 的推理是否正确?
- 这种方法还有其他缺陷吗?
- 你对我应该研究的其他方法有什么建议吗?
您将始终需要复制您的数据,因为不存在“连续”环形缓冲区(也许在某些高级芯片中确实存在)。
此外,您不能初始化运行时定义大小的数组模板。
您可以使用矢量来实现此目的:
#include <iostream>
#include <chrono>
#include <deque>
#include <vector>
int main() {
std::vector<double> v;
// pre fill it a little
for(double data = 0.0; data > -50000.0; data -= 1.0) {
v.push_back(data);
}
size_t cnt = 0;
int duration = 0;
int max = 0;
for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) {
auto t1 = std::chrono::high_resolution_clock::now();
v.push_back(data);
v.erase(v.begin());
// foo(v.data(), v.size());
auto t2 = std::chrono::high_resolution_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
duration += delta;
if(max == 0 || max < delta) {
max = delta;
}
}
std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;
return 0;
}
输出:
it took an average of 11us and a max of 245 us
谢谢维尔纳的回答。当我在 Repl.it 上 运行 这个解决方案时,我得到:
it took an average of 21us and a max of 57382us
为了比较,我最初的想法具有相同的缓冲区大小,结果如下:
it took an average of 19us and a max of 54129us
这意味着我最初的做法确实很幼稚:)
同时,在等待答案的同时,我想出了以下解决方案:
#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int buffer_size = 20;
std::array<double, buffer_size*2> buffer{};
int buffer_idx = buffer_size;
for (double data = 0.0; data < 100.0; data += 1.0)
{
buffer.at(buffer_idx - buffer_size) = data;
buffer.at(buffer_idx++) = data;
foo(buffer.data() + buffer_idx - buffer_size, buffer_size);
buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
}
}
由于缓冲区的大小不是问题,我分配了两倍所需的内存并在两个地方插入数据,由缓冲区大小偏移。当我到达终点时,我就像打字机一样回去。这个想法是我通过存储一份数据的副本来伪造循环缓冲区,这样它就可以读取数据,就好像它穿过了整圈一样。
对于 50000 的缓冲区大小,这给出了我想要的以下结果:
it took an average of 0us and a max of 23us
除了 stribor14 的 ,我还有另外两个建议。这些仅基于性能,因此在这里不会真正找到可读或可维护的代码。
我读这个问题的第一个想法也是分配两倍的存储量但只写一次。当所有地方都写完后,下半部分将复制到上半部分。我的第一直觉告诉我这可能是一个更好的表现。我的推理是,将发生相同数量的总写入,但所有写入都是顺序的(而不是每秒跳转到数组中的另一个位置)。
#include <cstddef>
#include <cstring>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, 2 * buffer_size> buffer{};
double *index = buffer.data();
double *mid = index + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
if (index == mid)
{
index = buffer.data();
std::memcpy(index, mid, buffer_size * sizeof(double));
}
*(index++ + buffer_size) = data;
foo(index, buffer_size);
}
}
或者我认为可以优化 OP 自己的答案以删除数组访问。这个想法是 buffer[buffer_idx - buffer_size]
需要 2 次加法来计算该值的位置,即:*(buffer + buffer_idx - buffer_size)
。如果 buffer_idx
包含指针,则只需要添加一次。这给出了以下代码:
#include <cstddef>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, buffer_size * 2> buffer{};
double *index = buffer.data();
double *mid = buffer.data() + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
*index = data;
*(index + buffer_size) = data;
++index;
index -= buffer_size * (index == mid);
foo(index, buffer_size);
}
}
现在我注意到我正在深入 C++ 优化的兔子洞。所以我们不能就此止步。要选择要使用的实现,我想 运行 一个基准。 Werner Pirkl 给出了 。但是 运行 在我们的优化代码上这样做是没有意义的,因为测量时间是 0μs。所以让我们稍微改变一下我在基准测试中写了一个循环给它一些 运行 时间并想出了:
const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;
// ... Set up of the buffers and indices
for (int i = 0; i < repeats; ++i)
{
auto t1 = std::chrono::high_resolution_clock::now();
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
// ... add data to circular buffer
ptr = // ... the start of the array
}
auto t2 = std::chrono::high_resolution_clock::now();
duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
}
(注意使用 volatile double *
以确保指向连续数组的原始指针未被优化。)
在 运行 进行这些测试时,我注意到它们非常依赖于编译器标志 (-O2 -O3 -march=native ...)。我将给出一些结果,但与所有 C++ 基准测试一样,请对它持保留态度,并且 运行 你自己的真实工作负载。 (报告的时间是每次插入的平均 ns)
with `memcpy` stribor14 `operator[]` with pointers
|---------------|-----------|--------------|---------------|
-O2 | 1.38 | 1.57 | 1.41 | 1.15 |
-O3 | 1.37 | 1.63 | 1.36 | 1.09 |
-O3 -march=native | 1.35 | 1.61 | 1.34 | 1.09 |
不用说:我对我认为应该表现最好的东西感到非常失望。但如前所述,此基准绝不代表任何现实世界的性能。
我正在寻求有关我解决以下问题的方法的建议。我有一个恒定的数据输入需要添加到我的缓冲区,并且在每次迭代中,我需要将缓冲数据传递给一个函数,该函数通过指针接受 C 样式数组。
我担心效率问题,所以我思考如何在某种循环缓冲区中存储和管理数据,同时将其作为顺序原始数据传递给上述函数。
我目前的方法可以总结为以下示例:
#include <iostream>
#include <array>
#include <algorithm>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int size = 20;
std::array<double, size> buffer{};
for (double data = 0.0; data < 50.0; data += 1.0)
{
std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
buffer.back() = data;
foo(buffer.data(), size);
}
}
在实际用例中,缓冲区还需要在开始时填充到“const”大小的数据(我在这里使用引号是因为大小在编译时可能知道,也可能不知道,但是一旦众所周知,它永远不会改变)。
我将数据存储在 std::array
中(如果在编译时不知道大小,则存储在 std::vector
中)因为数据在内存中是顺序的。当我需要插入新数据时,我使用 forward std::move
移动所有内容,然后我手动替换最后一项。最后,我只是将 std::array::data()
及其大小传递给函数。
虽然乍一看这应该可以有效地工作,但理性告诉我,因为数据是顺序存储的,所以整个缓冲区仍然会用 std::move
复制,每次插入将是 O(n)
真正的缓冲区大小可能只有数百个,数据到达的频率最大为 100Hz,但问题是我需要尽快获得被调用函数的结果,所以我不想在缓冲区上浪费时间管理(即使我们说的很少,甚至比 ms 还少)。我对此有很多疑问,但他们的候选名单如下:
- 我的做法是不是太天真了?
- 我关于 O(n) 的推理是否正确?
- 这种方法还有其他缺陷吗?
- 你对我应该研究的其他方法有什么建议吗?
您将始终需要复制您的数据,因为不存在“连续”环形缓冲区(也许在某些高级芯片中确实存在)。
此外,您不能初始化运行时定义大小的数组模板。
您可以使用矢量来实现此目的:
#include <iostream>
#include <chrono>
#include <deque>
#include <vector>
int main() {
std::vector<double> v;
// pre fill it a little
for(double data = 0.0; data > -50000.0; data -= 1.0) {
v.push_back(data);
}
size_t cnt = 0;
int duration = 0;
int max = 0;
for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) {
auto t1 = std::chrono::high_resolution_clock::now();
v.push_back(data);
v.erase(v.begin());
// foo(v.data(), v.size());
auto t2 = std::chrono::high_resolution_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
duration += delta;
if(max == 0 || max < delta) {
max = delta;
}
}
std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;
return 0;
}
输出:
it took an average of 11us and a max of 245 us
谢谢维尔纳的回答。当我在 Repl.it 上 运行 这个解决方案时,我得到:
it took an average of 21us and a max of 57382us
为了比较,我最初的想法具有相同的缓冲区大小,结果如下:
it took an average of 19us and a max of 54129us
这意味着我最初的做法确实很幼稚:)
同时,在等待答案的同时,我想出了以下解决方案:
#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int buffer_size = 20;
std::array<double, buffer_size*2> buffer{};
int buffer_idx = buffer_size;
for (double data = 0.0; data < 100.0; data += 1.0)
{
buffer.at(buffer_idx - buffer_size) = data;
buffer.at(buffer_idx++) = data;
foo(buffer.data() + buffer_idx - buffer_size, buffer_size);
buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
}
}
由于缓冲区的大小不是问题,我分配了两倍所需的内存并在两个地方插入数据,由缓冲区大小偏移。当我到达终点时,我就像打字机一样回去。这个想法是我通过存储一份数据的副本来伪造循环缓冲区,这样它就可以读取数据,就好像它穿过了整圈一样。
对于 50000 的缓冲区大小,这给出了我想要的以下结果:
it took an average of 0us and a max of 23us
除了 stribor14 的
我读这个问题的第一个想法也是分配两倍的存储量但只写一次。当所有地方都写完后,下半部分将复制到上半部分。我的第一直觉告诉我这可能是一个更好的表现。我的推理是,将发生相同数量的总写入,但所有写入都是顺序的(而不是每秒跳转到数组中的另一个位置)。
#include <cstddef>
#include <cstring>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, 2 * buffer_size> buffer{};
double *index = buffer.data();
double *mid = index + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
if (index == mid)
{
index = buffer.data();
std::memcpy(index, mid, buffer_size * sizeof(double));
}
*(index++ + buffer_size) = data;
foo(index, buffer_size);
}
}
或者我认为可以优化 OP 自己的答案以删除数组访问。这个想法是 buffer[buffer_idx - buffer_size]
需要 2 次加法来计算该值的位置,即:*(buffer + buffer_idx - buffer_size)
。如果 buffer_idx
包含指针,则只需要添加一次。这给出了以下代码:
#include <cstddef>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, buffer_size * 2> buffer{};
double *index = buffer.data();
double *mid = buffer.data() + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
*index = data;
*(index + buffer_size) = data;
++index;
index -= buffer_size * (index == mid);
foo(index, buffer_size);
}
}
现在我注意到我正在深入 C++ 优化的兔子洞。所以我们不能就此止步。要选择要使用的实现,我想 运行 一个基准。 Werner Pirkl 给出了
const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;
// ... Set up of the buffers and indices
for (int i = 0; i < repeats; ++i)
{
auto t1 = std::chrono::high_resolution_clock::now();
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
// ... add data to circular buffer
ptr = // ... the start of the array
}
auto t2 = std::chrono::high_resolution_clock::now();
duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
}
(注意使用 volatile double *
以确保指向连续数组的原始指针未被优化。)
在 运行 进行这些测试时,我注意到它们非常依赖于编译器标志 (-O2 -O3 -march=native ...)。我将给出一些结果,但与所有 C++ 基准测试一样,请对它持保留态度,并且 运行 你自己的真实工作负载。 (报告的时间是每次插入的平均 ns)
with `memcpy` stribor14 `operator[]` with pointers
|---------------|-----------|--------------|---------------|
-O2 | 1.38 | 1.57 | 1.41 | 1.15 |
-O3 | 1.37 | 1.63 | 1.36 | 1.09 |
-O3 -march=native | 1.35 | 1.61 | 1.34 | 1.09 |
不用说:我对我认为应该表现最好的东西感到非常失望。但如前所述,此基准绝不代表任何现实世界的性能。