用于顺序处理的高效通用缓冲队列
Efficient generic buffer queue for sequential processing
我有一个由并行程序更新的生产者-消费者队列。查询队列以获取有关当前队列内容的各种统计信息,例如均值或标准差或方差或其他内容。意思是,这是代码,我使用
class BufferQueue {
const int nMaxQueueSize_;
int* values;
int head, tail;
double sum;
::utils::FastMutex queue_mutex;
public:
BufferQueue(const int nMaxQueueSize) :
nMaxQueueSize_(nMaxQueueSize) {
head = tail = 0;
sum = 0;
values = new int[nMaxQueueSize_];
}
void enqueue(int val) {
values[head] = val;
if ((head + 1) % nMaxQueueSize_ == tail) {
queue_mutex.lock();
sum = val.value_point - values[tail].value_point;
utils::memory_barrier();
head = (1 + head) % nMaxQueueSize_;
tail = (1 + tail) % nMaxQueueSize_;
queue_mutex.unlock();
} else {
queue_mutex.lock();
sum += val.value_point;
utils::memory_barrier();
head = (1 + head) % nMaxQueueSize_;
queue_mutex.unlock();
}
}
bool dequeue() {
if (head != tail) {
queue_mutex.lock();
sum -= values[tail].value_point;
utils::memory_barrier();
tail = (1 + tail) % nMaxQueueSize_;
queue_mutex.unlock();
return true;
} else {
sum = 0;
return false;
}
}
MarketSpreadPoint& operator[](int i) {
return values[ (tail + i) % nMaxQueueSize_ ];
}
inline int getSize() {
return (head - tail + nMaxQueueSize_) % nMaxQueueSize_;
}
inline double average() {
queue_mutex.lock();
double result = sum / getSize();
queue_mutex.unlock();
return result;
}
~BufferQueue() {
delete values;
}
};
注意:要记住的一件重要事情是只执行一个操作。我也不想通过编写单独的实现来重复代码,例如 BufferQueueAverage、BufferQueueVariance 等。我想要非常有限的代码冗余(编译器优化)。即使对每次更新的队列类型进行调节似乎也不是最佳选择。
inline double average() {
queue_mutex.lock();
if(type_is_average){
double result = sum / getSize();
}else if(type_is_variance){
/// update accordingly.
}
double result = sum / getSize();
queue_mutex.unlock();
return result;
}
什么可以替代这个想法?
注意:在这个实现中,如果队列满了,head会自动让tail向前移动。换句话说,最旧的元素被自动删除。
谢谢
所以你想把队列和统计分开。我看到两种可能的解决方案:
- 使用像模板方法或策略这样的模式来分解依赖关系。
- 使用执行此操作的模板。
假设您收集的所有统计数据都可以 gathered incrementally,后者可能类似于以下内容(仅作为伪代码):
class StatisticsMean
{
private:
int n = 0;
double mean = 0.0;
public:
void addSample(int s) { ++n; mean += (s - mean) / n; }
void removeSample(int s) { ... }
double getStatistic() const { return mean; }
}
template <typename TStatistics>
class BufferQueue
{
TStatistics statistics;
...
void enqueue(int val)
{
...
statistics.addSample(val);
}
...
double getStatistic() const { return statistics.getStatistic(); }
}
模板方法为您提供了完整的编译时优化。您可以使用 Template Method 模式实现相同的效果。这也将允许您为吸气剂使用不同的名称(上例中的getStatistic()
)。
这看起来可能与此类似:
class AbstractBufferQueue
{
virtual void addSample(int s) = 0;
virtual void removeSample(int s) = 0;
void enqueue(int val)
{
...
addSample(val);
}
}
class BufferQueueAverage : public AbstractBufferQueue
{
int n;
double mean;
void addSample(int s) { ++n; mean += (s - mean) / n; }
void removeSample(int s) { ... }
double getAverage() const { return mean; }
}
完成您要求的一种方法是使用模板 类。
首先,确定 累加器 将具有的通用接口。它可能是这样的:
class accumulator
{
public:
typedef double value_type;
public:
void push(int v); // Called when pushing a new value.
void pop(int v); // Called when popping a new value;
value_type result(size_t n) const; // Returns the current accumulation.
};
作为一个特例,mean_accumulator
可能是这样的:
class mean_accumulator
{
public:
typedef double value_type;
public:
mean_accumulator() : m_sum{0} {}
void push(int v) { m_sum += v; }
void pop(int v); { m_sum -= v; }
double result(size_t n) const { return m_sum / n; };
private:
int m_sum;
};
现在,通过 Accumulator
参数化您的队列,并在必要时调用它(当您使用它时,请注意 boost::circular_buffer
具有实现所需的大部分内容:
template<class Accumulator>
class queue
{
private:
boost::circular_buffer<int> m_buf;
std::mutex m_m;
public:
void push(int v)
{
// Lock the mutex, push to the circular buffer, and the accumulator
}
bool pop()
{
// Lock the mutex; if relevant, update the accumulator and pop the circular buffer
}
typename Accumulator::value_type result() const
{
// Lock the mutex and return the accumulator's result.
}
};
我有一个由并行程序更新的生产者-消费者队列。查询队列以获取有关当前队列内容的各种统计信息,例如均值或标准差或方差或其他内容。意思是,这是代码,我使用
class BufferQueue {
const int nMaxQueueSize_;
int* values;
int head, tail;
double sum;
::utils::FastMutex queue_mutex;
public:
BufferQueue(const int nMaxQueueSize) :
nMaxQueueSize_(nMaxQueueSize) {
head = tail = 0;
sum = 0;
values = new int[nMaxQueueSize_];
}
void enqueue(int val) {
values[head] = val;
if ((head + 1) % nMaxQueueSize_ == tail) {
queue_mutex.lock();
sum = val.value_point - values[tail].value_point;
utils::memory_barrier();
head = (1 + head) % nMaxQueueSize_;
tail = (1 + tail) % nMaxQueueSize_;
queue_mutex.unlock();
} else {
queue_mutex.lock();
sum += val.value_point;
utils::memory_barrier();
head = (1 + head) % nMaxQueueSize_;
queue_mutex.unlock();
}
}
bool dequeue() {
if (head != tail) {
queue_mutex.lock();
sum -= values[tail].value_point;
utils::memory_barrier();
tail = (1 + tail) % nMaxQueueSize_;
queue_mutex.unlock();
return true;
} else {
sum = 0;
return false;
}
}
MarketSpreadPoint& operator[](int i) {
return values[ (tail + i) % nMaxQueueSize_ ];
}
inline int getSize() {
return (head - tail + nMaxQueueSize_) % nMaxQueueSize_;
}
inline double average() {
queue_mutex.lock();
double result = sum / getSize();
queue_mutex.unlock();
return result;
}
~BufferQueue() {
delete values;
}
};
注意:要记住的一件重要事情是只执行一个操作。我也不想通过编写单独的实现来重复代码,例如 BufferQueueAverage、BufferQueueVariance 等。我想要非常有限的代码冗余(编译器优化)。即使对每次更新的队列类型进行调节似乎也不是最佳选择。
inline double average() {
queue_mutex.lock();
if(type_is_average){
double result = sum / getSize();
}else if(type_is_variance){
/// update accordingly.
}
double result = sum / getSize();
queue_mutex.unlock();
return result;
}
什么可以替代这个想法?
注意:在这个实现中,如果队列满了,head会自动让tail向前移动。换句话说,最旧的元素被自动删除。
谢谢
所以你想把队列和统计分开。我看到两种可能的解决方案:
- 使用像模板方法或策略这样的模式来分解依赖关系。
- 使用执行此操作的模板。
假设您收集的所有统计数据都可以 gathered incrementally,后者可能类似于以下内容(仅作为伪代码):
class StatisticsMean
{
private:
int n = 0;
double mean = 0.0;
public:
void addSample(int s) { ++n; mean += (s - mean) / n; }
void removeSample(int s) { ... }
double getStatistic() const { return mean; }
}
template <typename TStatistics>
class BufferQueue
{
TStatistics statistics;
...
void enqueue(int val)
{
...
statistics.addSample(val);
}
...
double getStatistic() const { return statistics.getStatistic(); }
}
模板方法为您提供了完整的编译时优化。您可以使用 Template Method 模式实现相同的效果。这也将允许您为吸气剂使用不同的名称(上例中的getStatistic()
)。
这看起来可能与此类似:
class AbstractBufferQueue
{
virtual void addSample(int s) = 0;
virtual void removeSample(int s) = 0;
void enqueue(int val)
{
...
addSample(val);
}
}
class BufferQueueAverage : public AbstractBufferQueue
{
int n;
double mean;
void addSample(int s) { ++n; mean += (s - mean) / n; }
void removeSample(int s) { ... }
double getAverage() const { return mean; }
}
完成您要求的一种方法是使用模板 类。
首先,确定 累加器 将具有的通用接口。它可能是这样的:
class accumulator
{
public:
typedef double value_type;
public:
void push(int v); // Called when pushing a new value.
void pop(int v); // Called when popping a new value;
value_type result(size_t n) const; // Returns the current accumulation.
};
作为一个特例,mean_accumulator
可能是这样的:
class mean_accumulator
{
public:
typedef double value_type;
public:
mean_accumulator() : m_sum{0} {}
void push(int v) { m_sum += v; }
void pop(int v); { m_sum -= v; }
double result(size_t n) const { return m_sum / n; };
private:
int m_sum;
};
现在,通过 Accumulator
参数化您的队列,并在必要时调用它(当您使用它时,请注意 boost::circular_buffer
具有实现所需的大部分内容:
template<class Accumulator>
class queue
{
private:
boost::circular_buffer<int> m_buf;
std::mutex m_m;
public:
void push(int v)
{
// Lock the mutex, push to the circular buffer, and the accumulator
}
bool pop()
{
// Lock the mutex; if relevant, update the accumulator and pop the circular buffer
}
typename Accumulator::value_type result() const
{
// Lock the mutex and return the accumulator's result.
}
};