C 简单 RingBuffer - 多线程 - 查找临界区
C Simple RingBuffer - Multithreading - Finding Critical Sections
所以我写了一个简单的 C 环形缓冲区,我现在正在使用多线程进行测试,我很难尝试让代码失败,以便我可以识别关键部分。
注意:代码在 C 中,但我在 C++ 文件中对其进行测试,因为它更容易创建线程互斥等。
头文件:
#ifndef _C_TEST_H_
#define _C_TEST_H_
#include <stdio.h>
#include <mutex>
///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif
#define RING_BUFFER_SIZE 2000
///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////
typedef struct Node
{
int val;
struct Node *next;
struct Node *previous;
} Node_T;
typedef enum RB_ERC
{
RB_ERC_NO_ERROR,
RB_ERC_NULL_PTR,
RB_ERC_UNDERFLOW,
RB_ERC_OVERFLOW
} RB_ERC_T;
typedef enum RB_HANDLE_OVERFLOW
{
RB_DECIMATE,
RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;
typedef enum RB_READ_MODE
{
RB_FIFO,
RB_LIFO
} RB_READ_MODE_T;
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////
#ifdef __cplusplus
extern "C" {
#endif
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);
//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);
//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);
//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);
//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);
//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);
//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);
#ifdef __cplusplus
}
#endif
#endif //_C_TEST_H_
来源:
#include "CTest.h"
static std::mutex m;
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Initialize this instance of the ring buffer
//Both the read/write pointers should start at the same location
rb_->curSize = 0;
rb_->Read = &rb_->buffer[0];
rb_->Write = &rb_->buffer[0];
rb_->handleOverflow = handleOverflow_;
//Build the circular doubly-linked list
for(i = 0; i < RING_BUFFER_SIZE; i++)
{
rb_->buffer[i].val = 0;
if(i == 0)
{
//Sentinal Node found. Point the first node to the last element of the array
rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
rb_->buffer[i].next = &rb_->buffer[i + 1];
}
else if(i < (RING_BUFFER_SIZE - 1) )
{
rb_->buffer[i].next = &rb_->buffer[i + 1];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
else
{
//Sentinal node found. Reached the last element in the array; Point the sentinal
//node to the first element in the array to create a circular linked list.
rb_->buffer[i].next = &rb_->buffer[0];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
}
//m.unlock();
return erc;
}
bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == 0)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
bool RB_IsFull(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == RING_BUFFER_SIZE)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0 || values_ == 0 || length_ == 0)
{
return RB_ERC_NULL_PTR;
}
switch(rb_->handleOverflow)
{
//Increment through the array and enqueue
//If attempting to write more elements than are available on the queue
//Decimate - overwrite old data
//Ignore and return error - Don't write any data and throw an error
case RB_DECIMATE:
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
{
int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
if( length_ <= numSeatsAvailable )
{
//Increment through the array and enqueue
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
}
else
{
//Attempted to write more elements than are avaialable on the queue
erc = RB_ERC_OVERFLOW;
}
}
break;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsFull(rb_) )
{
//Write the value to the current location, then increment the write pointer
//so that the write pointer is always pointing 1 element ahead of the queue
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->curSize++;
}
else
{
//Overflow
switch(rb_->handleOverflow)
{
case RB_DECIMATE:
//Set the value and increment both the read/write pointers
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->Read = rb_->Read->next;
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
erc = RB_ERC_OVERFLOW;
break;
}
}
//m.unlock();
return erc;
}
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(values_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Verify that the amount of data to be read is actually available on the queue
if( length_ <= rb_->curSize )
{
//Increment through the array and dequeue
int i;
for(i = 0; i < length_; i++)
{
//Note: Error conditions have already been checked. Skip the ERC check
(void) RB_Read(rb_, &values_[i], readMode_);
}
}
else
{
//Attempted to read more data than is available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0 || readVal_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsEmpty(rb_) )
{
switch(readMode_)
{
case RB_LIFO:
//Use the head (Write) to read the most recently written value (newest data)
//Note: The write pointer is always pointing 1 position ahead of the current queue.
rb_->Write = rb_->Write->previous; //Decrement write pointer
//Read the data
*readVal_ = rb_->Write->val;
rb_->Write->val = 0; //Reset read values to 0
break;
default:
case RB_FIFO:
*readVal_ = rb_->Read->val;
rb_->Read->val = 0; //Reset read values to 0
rb_->Read = rb_->Read->next; //Increment read pointer
break;
}
rb_->curSize--;
}
else
{
//Attempted to read more data but there is no data available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
用于测试的主要 CPP:
#include "CTest.h"
#include <iostream>
#include "windows.h"
#include <thread>
using namespace std;
static RING_BUFFER_T test1;
const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;
void function1()
{
int data[dataSize];
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i < dataSizeout; i++)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}
//RB_WriteArray(&test1, data, dataSize);
}
void function2()
{
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i > -dataSizeout; i--)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}
}
int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);
thread p1(function1);
//Sleep(1000);
thread p2(function2);
p1.join();
p2.join();
//Read out 5 at a time
int out;
int cnt = 0;
while(cnt < (2 * dataSizeout) )
{
if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
{
printf("out[%d] = %d\n", cnt, out);
cnt += 1;
}
}
system("Pause");
return 0;
}
我认为 RING_BUFFER_T 主实例中的所有内容都将是共享变量,因此无论在哪里使用它们,几乎无处不在,它们都必须包含在互斥锁中。
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
我想 NODE_T 也可以,但仅用于初始化。我错了还是不应该将填充在环形缓冲区中的元素乱序放置,因为现在没有使用互斥体?
对于 state-of-the-art 环形缓冲区的 state-of-the-art C 实现,请查看 Linux 内核源代码。这应该让您了解专家是如何做的,它是 battle-proven 代码。请参阅 linux/kfifo.h 和相应的 C 文件。
design description of Linux ring buffer, dunno how up-to-date it is
有关如何在 C++ 中执行此操作的想法,您可以查看
Linux Journal article about C++ lock-free queue
或者看看 boost::lockfree::queue。当然,使用 C++ 可以让您使用泛型类型(模板),例如用 compile-time 绑定调用替换函数指针,从而实现比 C 更好的性能。并且您可以避免那些讨厌的 void* 指针。
您不得公开函数 RB_IsEmpty
和 RB_IsFull
,因为 return 值可能立即无效。如果您仅从 read/write 中调用它们,则无需在该函数中进行保护。
通常,您必须在外部公开的读写函数中保护您的结构,防止第一次访问到最后一次访问。不需要保护参数检查。
你不能双锁。不要从 RB_ReadArray
调用 RB_Read
。提供两者都使用的内部读取功能。写函数也一样。
所以我写了一个简单的 C 环形缓冲区,我现在正在使用多线程进行测试,我很难尝试让代码失败,以便我可以识别关键部分。
注意:代码在 C 中,但我在 C++ 文件中对其进行测试,因为它更容易创建线程互斥等。
头文件:
#ifndef _C_TEST_H_
#define _C_TEST_H_
#include <stdio.h>
#include <mutex>
///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif
#define RING_BUFFER_SIZE 2000
///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////
typedef struct Node
{
int val;
struct Node *next;
struct Node *previous;
} Node_T;
typedef enum RB_ERC
{
RB_ERC_NO_ERROR,
RB_ERC_NULL_PTR,
RB_ERC_UNDERFLOW,
RB_ERC_OVERFLOW
} RB_ERC_T;
typedef enum RB_HANDLE_OVERFLOW
{
RB_DECIMATE,
RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;
typedef enum RB_READ_MODE
{
RB_FIFO,
RB_LIFO
} RB_READ_MODE_T;
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////
#ifdef __cplusplus
extern "C" {
#endif
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);
//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);
//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);
//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);
//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);
//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);
//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);
#ifdef __cplusplus
}
#endif
#endif //_C_TEST_H_
来源:
#include "CTest.h"
static std::mutex m;
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Initialize this instance of the ring buffer
//Both the read/write pointers should start at the same location
rb_->curSize = 0;
rb_->Read = &rb_->buffer[0];
rb_->Write = &rb_->buffer[0];
rb_->handleOverflow = handleOverflow_;
//Build the circular doubly-linked list
for(i = 0; i < RING_BUFFER_SIZE; i++)
{
rb_->buffer[i].val = 0;
if(i == 0)
{
//Sentinal Node found. Point the first node to the last element of the array
rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
rb_->buffer[i].next = &rb_->buffer[i + 1];
}
else if(i < (RING_BUFFER_SIZE - 1) )
{
rb_->buffer[i].next = &rb_->buffer[i + 1];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
else
{
//Sentinal node found. Reached the last element in the array; Point the sentinal
//node to the first element in the array to create a circular linked list.
rb_->buffer[i].next = &rb_->buffer[0];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
}
//m.unlock();
return erc;
}
bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == 0)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
bool RB_IsFull(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == RING_BUFFER_SIZE)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0 || values_ == 0 || length_ == 0)
{
return RB_ERC_NULL_PTR;
}
switch(rb_->handleOverflow)
{
//Increment through the array and enqueue
//If attempting to write more elements than are available on the queue
//Decimate - overwrite old data
//Ignore and return error - Don't write any data and throw an error
case RB_DECIMATE:
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
{
int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
if( length_ <= numSeatsAvailable )
{
//Increment through the array and enqueue
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
}
else
{
//Attempted to write more elements than are avaialable on the queue
erc = RB_ERC_OVERFLOW;
}
}
break;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsFull(rb_) )
{
//Write the value to the current location, then increment the write pointer
//so that the write pointer is always pointing 1 element ahead of the queue
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->curSize++;
}
else
{
//Overflow
switch(rb_->handleOverflow)
{
case RB_DECIMATE:
//Set the value and increment both the read/write pointers
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->Read = rb_->Read->next;
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
erc = RB_ERC_OVERFLOW;
break;
}
}
//m.unlock();
return erc;
}
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(values_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Verify that the amount of data to be read is actually available on the queue
if( length_ <= rb_->curSize )
{
//Increment through the array and dequeue
int i;
for(i = 0; i < length_; i++)
{
//Note: Error conditions have already been checked. Skip the ERC check
(void) RB_Read(rb_, &values_[i], readMode_);
}
}
else
{
//Attempted to read more data than is available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0 || readVal_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsEmpty(rb_) )
{
switch(readMode_)
{
case RB_LIFO:
//Use the head (Write) to read the most recently written value (newest data)
//Note: The write pointer is always pointing 1 position ahead of the current queue.
rb_->Write = rb_->Write->previous; //Decrement write pointer
//Read the data
*readVal_ = rb_->Write->val;
rb_->Write->val = 0; //Reset read values to 0
break;
default:
case RB_FIFO:
*readVal_ = rb_->Read->val;
rb_->Read->val = 0; //Reset read values to 0
rb_->Read = rb_->Read->next; //Increment read pointer
break;
}
rb_->curSize--;
}
else
{
//Attempted to read more data but there is no data available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
用于测试的主要 CPP:
#include "CTest.h"
#include <iostream>
#include "windows.h"
#include <thread>
using namespace std;
static RING_BUFFER_T test1;
const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;
void function1()
{
int data[dataSize];
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i < dataSizeout; i++)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}
//RB_WriteArray(&test1, data, dataSize);
}
void function2()
{
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i > -dataSizeout; i--)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}
}
int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);
thread p1(function1);
//Sleep(1000);
thread p2(function2);
p1.join();
p2.join();
//Read out 5 at a time
int out;
int cnt = 0;
while(cnt < (2 * dataSizeout) )
{
if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
{
printf("out[%d] = %d\n", cnt, out);
cnt += 1;
}
}
system("Pause");
return 0;
}
我认为 RING_BUFFER_T 主实例中的所有内容都将是共享变量,因此无论在哪里使用它们,几乎无处不在,它们都必须包含在互斥锁中。
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
我想 NODE_T 也可以,但仅用于初始化。我错了还是不应该将填充在环形缓冲区中的元素乱序放置,因为现在没有使用互斥体?
对于 state-of-the-art 环形缓冲区的 state-of-the-art C 实现,请查看 Linux 内核源代码。这应该让您了解专家是如何做的,它是 battle-proven 代码。请参阅 linux/kfifo.h 和相应的 C 文件。
design description of Linux ring buffer, dunno how up-to-date it is
有关如何在 C++ 中执行此操作的想法,您可以查看
Linux Journal article about C++ lock-free queue
或者看看 boost::lockfree::queue。当然,使用 C++ 可以让您使用泛型类型(模板),例如用 compile-time 绑定调用替换函数指针,从而实现比 C 更好的性能。并且您可以避免那些讨厌的 void* 指针。
您不得公开函数 RB_IsEmpty
和 RB_IsFull
,因为 return 值可能立即无效。如果您仅从 read/write 中调用它们,则无需在该函数中进行保护。
通常,您必须在外部公开的读写函数中保护您的结构,防止第一次访问到最后一次访问。不需要保护参数检查。
你不能双锁。不要从 RB_ReadArray
调用 RB_Read
。提供两者都使用的内部读取功能。写函数也一样。