如何在满时覆盖的无锁 SPSC 环形缓冲区上区分空和满?
How to distinguish empty from full on a lock-free SPSC ring buffer that overwrites when full?
在裸机微控制器实时嵌入式系统上,我在覆盖模式下使用简单的单一生产者单一消费者 (SPSC) 环形缓冲区*。因为它是 SPSC,它不需要锁,可以简单地做:
void write(data) {
buf[writeptr] = data;
tmp = (writeptr + 1) % BUF_SIZE; // Avoid writeptr++ so that modification of writeptr is atomic
writeptr = tmp;
}
T read() {
d = buf[readptr];
tmp = (readptr + 1) % BUF_SIZE;
readptr = tmp;
}
bool is_empty() {
return writeptr == readptr;
}
然而,问题在于,溢出不仅会丢失被覆盖的一个数据:它会导致缓冲区认为它是空的,实际上会丢失 all 数据。而且不只是 is_empty
错了一次:想象一下缓冲区溢出然后写入了 2 个数据。读取这 2 个数据后,缓冲区将再次显示为空,即使其中确实有 BUF_SIZE - 2
数据。
在没有锁的情况下解决这个问题并不容易。我的目标:
- 单个生产者、单个消费者环形缓冲区
- 溢出时覆盖(即覆盖最旧的数据)
- 适用于轻量级、实时、裸机、嵌入式微控制器
- 理想情况下没有锁
这可能吗?如果不是:实现其他目标的最小锁使用量是多少?
(平台:ARM Cortex M3 裸机)
- "Ring buffers 可用于覆盖模式或
producer/consumer 模式...覆盖模式是如果生产者在消费者可以释放任何东西之前填满缓冲区,生产者将覆盖旧数据。这将丢失最早的事件。"
这相对简单,只要您的缓冲区不大于 64kB(以便指针适合 16 位)。
将您的读写指针放在一个 32 位字的两个 16 位半部分中。如果缓冲区已满,则在编写器中使用 LDREX 和 STREX 操作以原子方式调整读取指针以丢弃最旧的字节。
修改伪代码:
void write(data) {
buf[writeptr] = data;
tmp = (writeptr + 1) % BUF_SIZE;
ATOMIC {
writeptr = tmp;
if (readptr == tmp) {
readptr = (tmp + 1) % BUF_SIZE;
}
}
}
在裸机微控制器实时嵌入式系统上,我在覆盖模式下使用简单的单一生产者单一消费者 (SPSC) 环形缓冲区*。因为它是 SPSC,它不需要锁,可以简单地做:
void write(data) {
buf[writeptr] = data;
tmp = (writeptr + 1) % BUF_SIZE; // Avoid writeptr++ so that modification of writeptr is atomic
writeptr = tmp;
}
T read() {
d = buf[readptr];
tmp = (readptr + 1) % BUF_SIZE;
readptr = tmp;
}
bool is_empty() {
return writeptr == readptr;
}
然而,问题在于,溢出不仅会丢失被覆盖的一个数据:它会导致缓冲区认为它是空的,实际上会丢失 all 数据。而且不只是 is_empty
错了一次:想象一下缓冲区溢出然后写入了 2 个数据。读取这 2 个数据后,缓冲区将再次显示为空,即使其中确实有 BUF_SIZE - 2
数据。
在没有锁的情况下解决这个问题并不容易。我的目标:
- 单个生产者、单个消费者环形缓冲区
- 溢出时覆盖(即覆盖最旧的数据)
- 适用于轻量级、实时、裸机、嵌入式微控制器
- 理想情况下没有锁
这可能吗?如果不是:实现其他目标的最小锁使用量是多少?
(平台:ARM Cortex M3 裸机)
- "Ring buffers 可用于覆盖模式或 producer/consumer 模式...覆盖模式是如果生产者在消费者可以释放任何东西之前填满缓冲区,生产者将覆盖旧数据。这将丢失最早的事件。"
这相对简单,只要您的缓冲区不大于 64kB(以便指针适合 16 位)。
将您的读写指针放在一个 32 位字的两个 16 位半部分中。如果缓冲区已满,则在编写器中使用 LDREX 和 STREX 操作以原子方式调整读取指针以丢弃最旧的字节。
修改伪代码:
void write(data) {
buf[writeptr] = data;
tmp = (writeptr + 1) % BUF_SIZE;
ATOMIC {
writeptr = tmp;
if (readptr == tmp) {
readptr = (tmp + 1) % BUF_SIZE;
}
}
}