字符串的循环缓冲区

Circular Buffer for Strings

我正在缓冲进程的 stdout、stderr 和 stdin 的最后 X 行。 我想保留最后的 X 行并能够通过其 id(行号)访问一行。 因此,如果我们存储 100 行并插入其中的 200 行,您可以访问第 100-200 行。 (实际上我们想要存储 ~2000 行。)

性能案例是插入。所以插入本身应该很快。检索偶尔会发生,但可能占用例的 10%。 (大部分时间我们不会查看输出。)

旧方法,分段
我使用了一个包装 ArrayDeque 然后保留了行数,但这意味着我们在上面的例子中使用了 [Vec<u8>;100]。一个字符串数组因此是一个 Vec<u8>.

数组

新方法,有未解决的问题
我的*新想法是将数据存储在一个 u8 数组中,然后记录数组中每个条目的起始位置和长度。这里的问题是我们需要簿记也是某种环形缓冲区,并在我们的数据数组必须换行时擦除旧条目。也许还有更好的方法来实现这个?至少这充分利用了环形缓冲区并防止内存碎片。

*也感谢 Rust 社区的 sebk

当前的简单方法

const MAX: usize = 5;

pub struct LineRingBuffer {
    counter: Option<usize>,
    data: ArrayDeque<[String; MAX], Wrapping>,
    min_line: usize,
}

impl LineRingBuffer {
    pub fn new() -> Self {
        Self {
            counter: None,
            data: ArrayDeque::new(),
            min_line: 0,
        }
    }

    pub fn get<'a>(&'a self,pos: usize) -> Option<&String> {
        if let Some(max) = self.counter {
            if pos >= self.min_line && pos <= max {
                return self.data.get(pos - self.min_line);
            }
        }
        None
    }

    pub fn insert(&mut self, line: String) {
        self.data.push_back(line);
        if let Some(ref mut v) = self.counter {
            *v += 1;
            if *v - self.min_line >= MAX {
                self.min_line += 1;
            }
        } else {
            self.counter = Some(0);
        }
    }
}

新思路草案质疑:

pub struct SliceRingbuffer {
    counter: Option<usize>,
    min_line: usize,
    data: Box<[u8;250_000]>,
    index: ArrayDeque<Entry,Wrapping>,
}

struct Entry {
    start: usize,
    length: usize,
}

无论出于何种原因,当前的方法仍然非常快,尽管我预计会有很多不同大小的分配(取决于行)并因此产生碎片。

让我们回到基础。

循环缓冲区通常保证没有碎片,因为它不是按您存储的内容分类的,而是按大小分类的。例如,您可以定义一个 1MB 的循环缓冲区。对于固定长度类型,这为您提供了可以存储的固定数量的元素。

你显然没有这样做。通过将 Vec<u8> 存储为一个元素,即使总体数组是固定长度的,内容也不是。数组中存储的每个元素都是一个指向 Vec(起始点和长度)的胖指针。

当然,当您插入时,您将不得不:

  1. 创建这个 Vec(这是您正在考虑的碎片,但实际上并没有看到,因为 Rust 分配器在处理这类事情时非常有效)
  2. 将 vec 插入到它应该在的位置,必要时将所有东西横向移动(标准循环缓冲区技术在这里发挥作用)

您的第二个选择是实际 循环缓冲区。如果操作正确,您将获得固定大小和零分配,您将失去存储 整行 行的能力,并且 100% 保证在缓冲区的开头有整行。

在我们进入 DYI 的广阔领域之前,需要快速指向 VecDeque。这是您实施的优化得多的版本,尽管有一些(完全有保证的)unsafe 部分。


实现我们自己的循环缓冲区

我们将做出一系列假设并为此设置一些要求:

  • 我们希望能够存储大字符串
  • 我们的缓冲区存储字节。因此,整个堆栈正在处理拥有的 u8
  • 我们将使用一个简单的Vec;实际上你根本不会重新实现整个结构,数组纯粹是为了演示

这些选择的结果是以下元素结构:

| Element size | Data     |
|--------------|----------|
|  4 bytes     |  N bytes |

因此,我们在每条消息之前丢失 4 个字节,以便能够清楚地 pointer/skip 引用下一个元素(最大大小与 u32 相当)。

一个简单的实现示例如下(playground link):

use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};

pub struct CircularBuffer {
    data: Vec<u8>,
    tail: usize,
    elements: usize,
}
impl CircularBuffer {
    pub fn new(max: usize) -> Self {
        CircularBuffer {
            data: Vec::with_capacity(max),
            elements: 0,
            tail: 0,
        }
    }

    /// Amount of elements in buffer
    pub fn elements(&self) -> usize {
        self.elements
    }

    /// Amount of used bytes in buffer, including metadata
    pub fn len(&self) -> usize {
        self.tail
    }

    /// Length of first element in ringbuffer
    pub fn next_element_len(&self) -> Option<usize> {
        self.data
            .get(0..4)
            .and_then(|mut v| v.read_u32::<NativeEndian>().ok().map(|r| r as usize))
    }

    /// Remove first element in ringbuffer (wrap)
    pub fn pop(&mut self) -> Option<Vec<u8>> {
        self.next_element_len().map(|chunk_size| {
            self.tail -= chunk_size + 4;
            self.elements -= 1;
            self.data
                .splice(..(chunk_size + 4), vec![])
                .skip(4)
                .collect()
        })
    }

    pub fn get(&self, idx: usize) -> Option<&[u8]> {
        if self.elements <= idx {
            return None;
        }
        let mut current_head = 0;
        let mut current_element = 0;
        while current_head < self.len() - 4 {
            // Get the length of the next block
            let element_size = self
                .data
                .get(0..4)
                .and_then(|mut v| v.read_u32::<NativeEndian>().ok().map(|r| r as usize))
                .unwrap();
            if current_element == idx {
                return self
                    .data
                    .get((current_head + 4)..(current_head + element_size + 4));
            }
            current_element += 1;
            current_head += 4 + element_size;
        }
        return None;
    }

    pub fn insert(&mut self, mut element: Vec<u8>) {
        let e_len = element.len();

        let capacity = self.data.capacity();
        while self.len() + e_len + 4 > capacity {
            self.pop();
        }
        self.data.write_u32::<NativeEndian>(e_len as u32).unwrap();
        self.data.append(&mut element);
        self.tail += 4 + e_len;
        self.elements += 1;
        println!("{:?}", self.data);
    }
}

请再次注意,这是一个 朴素的 实现,旨在向您展示如何解决在缓冲区中剪切字符串的问题。 "real",最佳实现将 unsafe 移动和删除元素。