用于顺序处理的高效通用缓冲队列

Efficient generic buffer queue for sequential processing

我有一个由并行程序更新的生产者-消费者队列。查询队列以获取有关当前队列内容的各种统计信息,例如均值或标准差或方差或其他内容。意思是,这是代码,我使用

class BufferQueue {
    const int nMaxQueueSize_;
    int* values;
    int head, tail;
    double sum;
    ::utils::FastMutex queue_mutex;

public:

    BufferQueue(const int nMaxQueueSize) :
    nMaxQueueSize_(nMaxQueueSize) {
        head = tail = 0;
        sum = 0;
        values = new int[nMaxQueueSize_];
    }

    void enqueue(int val) {
        values[head] = val;
        if ((head + 1) % nMaxQueueSize_ == tail) {
            queue_mutex.lock();
            sum = val.value_point - values[tail].value_point;
            utils::memory_barrier();
            head = (1 + head) % nMaxQueueSize_;
            tail = (1 + tail) % nMaxQueueSize_;
            queue_mutex.unlock();
        } else {
            queue_mutex.lock();
            sum += val.value_point;
            utils::memory_barrier();
            head = (1 + head) % nMaxQueueSize_;
            queue_mutex.unlock();
        }
    }

    bool dequeue() {
        if (head != tail) {
            queue_mutex.lock();
            sum -= values[tail].value_point;
            utils::memory_barrier();
            tail = (1 + tail) % nMaxQueueSize_;
            queue_mutex.unlock();
            return true;
        } else {
            sum = 0;
            return false;
        }
    }

    MarketSpreadPoint& operator[](int i) {
        return values[ (tail + i) % nMaxQueueSize_ ];
    }

    inline int getSize() {
        return (head - tail + nMaxQueueSize_) % nMaxQueueSize_;
    }

    inline double average() {
        queue_mutex.lock();
        double result = sum / getSize();
        queue_mutex.unlock();
        return result;
    }

    ~BufferQueue() {
        delete values;
    }
};

注意:要记住的一件重要事情是只执行一个操作。我也不想通过编写单独的实现来重复代码,例如 BufferQueueAverageBufferQueueVariance 等。我想要非常有限的代码冗余(编译器优化)。即使对每次更新的队列类型进行调节似乎也不是最佳选择。

    inline double average() {
        queue_mutex.lock();
        if(type_is_average){
            double result = sum / getSize();
        }else if(type_is_variance){
            /// update accordingly.
        }
        double result = sum / getSize();
        queue_mutex.unlock();
        return result;
    }

什么可以替代这个想法?

注意:在这个实现中,如果队列满了,head会自动让tail向前移动。换句话说,最旧的元素被自动删除。

谢谢

所以你想把队列和统计分开。我看到两种可能的解决方案:

  1. 使用像模板方法策略这样的模式来分解依赖关系。
  2. 使用执行此操作的模板。

假设您收集的所有统计数据都可以 gathered incrementally,后者可能类似于以下内容(仅作为伪代码):

class StatisticsMean
{
private:
    int n = 0;
    double mean = 0.0;
public: 
    void addSample(int s) { ++n; mean += (s - mean) / n; }
    void removeSample(int s) { ... }
    double getStatistic() const { return mean; }
}

template <typename TStatistics>
class BufferQueue 
{
    TStatistics statistics;
    ...

    void enqueue(int val) 
    {
        ...
        statistics.addSample(val);
    }
    ...
    double getStatistic() const { return statistics.getStatistic(); }
}

模板方法为您提供了完整的编译时优化。您可以使用 Template Method 模式实现相同的效果。这也将允许您为吸气剂使用不同的名称(上例中的getStatistic())。

这看起来可能与此类似:

class AbstractBufferQueue 
{
    virtual void addSample(int s) = 0;
    virtual void removeSample(int s) = 0;

    void enqueue(int val) 
    {
        ...
        addSample(val);
    }
}

class BufferQueueAverage : public AbstractBufferQueue
{
    int n;
    double mean;

    void addSample(int s) { ++n; mean += (s - mean) / n; }
    void removeSample(int s) { ... }
    double getAverage() const { return mean; }
}

完成您要求的一种方法是使用模板 类。

首先,确定 累加器 将具有的通用接口。它可能是这样的:

class accumulator
{
public:
    typedef double value_type;

public:
    void push(int v); // Called when pushing a new value.
    void pop(int v); // Called when popping a new value;
    value_type result(size_t n) const; // Returns the current accumulation.
};

作为一个特例,mean_accumulator可能是这样的:

class mean_accumulator
{
public:
    typedef double value_type;

public:
    mean_accumulator() : m_sum{0} {}

    void push(int v) { m_sum += v; }
    void pop(int v); { m_sum -= v; }
    double result(size_t n) const { return m_sum / n; };

private:
    int m_sum;
};

现在,通过 Accumulator 参数化您的队列,并在必要时调用它(当您使用它时,请注意 boost::circular_buffer 具有实现所需的大部分内容:

template<class Accumulator>
class queue
{
private:
    boost::circular_buffer<int> m_buf;
    std::mutex m_m;

public:
    void push(int v) 
    {
       // Lock the mutex, push to the circular buffer, and the accumulator
    }

    bool pop() 
    {
       // Lock the mutex; if relevant, update the accumulator and pop the circular buffer
    }

    typename Accumulator::value_type result() const
    {
       // Lock the mutex and return the accumulator's result.
    }
};