高效平均(移动平均)

Efficient averaging (moving average)

我有给定(恒定)频率的数据流(整数)。有时我需要计算不同的平均值(预定义)。我正在寻找快速高效的解决方案。

假设:

我现在拥有的:

为了让您更深入地了解,我现在有一个代码:

public class Buffer<T> : LinkedList<T>
{
    private readonly int capacity;

    public bool IsFull => Count >= capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;
    }

    public void Enqueue(T item)
    {
        if (Count == capacity)
        {
            RemoveFirst();
        }
        AddLast(item);
    }
}


public class MovingAverage
{
    private readonly Buffer<float> Buffer;
    private static readonly object bufferLock = new object();
    public Dictionary<string, float> Sums { get; private set; }
    public Dictionary<string, int> Counts { get; private set; }

    public MovingAverage(List<int> sampleCounts, List<string> names)
    {
        if (sampleCounts.Count != names.Count)
        {
            throw new ArgumentException("Wrong Moving Averages parameters");
        }
        Buffer = new Buffer<float>(sampleCounts.Max());

        Sums = new Dictionary<string, float>();
        Counts = new Dictionary<string, int>();

        for (int i = 0; i < names.Count; i++)
        {
            Sums[names[i]] = 0;
            Counts[names[i]] = sampleCounts[i];
        }
    }


    public void ProcessAveraging(float val)
    {
        lock (bufferLock)
        {
            if (float.IsNaN(val))
            {
                val = 0;
            }
            foreach (var keyVal in Counts.OrderBy(a => a.Value))
            {
                Sums[keyVal.Key] += val;
                if (Buffer.Count >= keyVal.Value)
                {
                    Sums[keyVal.Key] -= Buffer.ElementAt(Buffer.Count - keyVal.Value);
                }

            }
            Buffer.Enqueue(val);
        }
    }

    public float GetLastAverage(string averageName)
    {
        lock (bufferLock)
        {
            if (Buffer.Count >= Counts[averageName])
            {
                return Sums[averageName] / Counts[averageName];
            }
            else
            {
                return Sums[averageName] / Buffer.Count;
            }
        }
    }
}

这真的很好用而且速度足够快,但在现实世界中,拥有 100 SPS 并不意味着您将始终在 1 秒内拥有 100 个样本。有时是 100,有时是 99,有时是 101。计算这些平均值对我的系统至关重要,1 个样本或多或少可能会发生很大变化。这就是为什么我需要一个实时计时器来告诉我样本是否已经超出移动平均线 window。

为每个样本添加时间戳的想法似乎很有希望

创建一个大小为 500 的数组,int counter c

For every sample:
    summ -= A[c % 500]  //remove old value
    summ += sample 
    A[c % 500] = sample  //replace it with new value
    c++
    if needed, calculate
        average = summ / 500

您总是希望删除序列一侧最旧的元素并在序列的另一侧添加新元素:您需要队列而不是堆栈。

我认为圆列表会更快:只要你没有最大尺寸,就添加元素,一旦你达到最大尺寸,替换最旧的元素。

这似乎是一个很好的可重复使用 class。稍后我们将添加移动平均线部分。

class RoundArray<T>
{
    public RoundArray(int maxSize)
    {
        this.MaxSize = maxSize;
        this.roundArray = new List<T>(maxSize);
    }

    private readonly int maxSize;
    private readonly List<T> roundArray;
    public int indexOldestItem = 0;

    public void Add(T item)
    {
        // if list not full, just add
        if (this.roundArray.Count < this.maxSize)
            this.roundArray.Add(item);
        else
        {
            // list is full, replace the oldest item:
            this.roundArray[oldestItem] = item;
            oldestItem = (oldestItem + 1) % this.maxSize;
        } 

        public int Count => this.roundArray.Count;
        public T Oldest => this.roundArray[this.indexOldestItem];               
    }
}

要使此 class 有用,请添加枚举数据的方法,从最旧或最新开始,考虑添加其他有用的可重用方法。也许你应该实现 IReadOnlyCollection<T>。也许一些私有字段应该有 public 属性。

您的移动平均计算器将使用此 RoundArray。每当添加一个项目,并且您的 roundArray 尚未满时,该项目就会添加到总和和圆形数组中。

如果 roundArray 已满,则该项目将替换最旧的项目。您从 Sum 中减去 OldestItem 的值,然后将新 Item 添加到 Sum。

class MovingAverageCalculator
{
    public MovingAverageCalculator(int maxSize)
    {
        this.roundArray = new RoundArray<int>(maxSize);
    }

    private readonly RoundArray<int> roundArray;
    private int sum = 0;

    private int Count => this.RoundArray.Count;
    private int Average => this.sum / this.Count;

    public voidAdd(int value)
    {
        if (this.Count == this.MaxSize)
        {
            // replace: remove the oldest value from the sum and add the new one
            this.Sum += value - this.RoundArray.Oldest;
        }
        else
        {
            // still building: just add the new value to the Sum
            this.Sum  += value;
        }
        this.RoundArray.Add(value);
    }
}

累计金额。

为每块大约 1000 个元素计算一系列累积和1。 (可能会更少,但是 500 或 1000 差别不大,这样会更舒服)只要内部至少有一个元素相关,您就希望保留每个块。然后就可以回收了。2

当您需要当前总和且您在一个街区内时,您想要的总和为:
block[max_index] - block[last_relevant_number].

如果你处于两个区块的交界处b1,b2,你想要的总和是:
b1[b1.length - 1] - b1[last_relevant_number] + b2[max_index]

我们完成了。这种方法的主要优点是您不需要事先知道要保留多少元素,并且可以随时计算结果。
您也不需要处理元素的删除,因为当您回收段时您会自然地覆盖它们 - 保留索引就是您所需要的。

示例:让我们有一个常数时间序列 ts = [1,1,1, .... 1]。该系列的累计总和将为 cumsum = [1,2,3 ... n]。 ts 的第 i 个到第 j 个(含)元素的总和将为 cumsum[j] - cumsum[i - 1] = j - i - 1。对于 i = 5,j = 6,它将是 6 - 4 = 2 这是正确的。


1 对于数组 [1,2,3,4,5],这些将是 [1,3,6,10,15] - 只是为了完整性。
2 既然你提到了 ~500 个元素,那么两个块就足够了。

我不会使用链表,而是使用一些内部函数作为数组副本。在这个答案中,我包括了对你的缓冲区 class 的可能重写。接手每一个位置求和的想法

这个缓冲区跟踪所有的总和,但为了做到这一点,它需要用新值对每个项目求和。根据您需要获得该平均值的频率,最好在需要时进行总结并仅保留单个值。

无论如何,我只是想指出如何使用 Array.Copy

public class BufferSum
{
    private readonly int _capacity;
    private readonly int _last;
    private float[] _items;

    public int Count { get; private set; }

    public bool IsFull => Count >= _capacity;

    public BufferSum(int capacity)
    {
        _capacity = capacity;
        _last = capacity - 1;
        _items = new float[_capacity];
    }

    public void Enqueue(float item)
    {
        if (Count == _capacity)
        {
            Array.Copy(_items, 1, _items, 0, _last);
            _items[_last] = 0;
        }
        else
        {
            Count++;
        }

        for (var i = 0; i < Count; i ++)
        {
            _items[i] += item;
        }
    }

    public float Avarage => _items[0] / Count;

    public float AverageAt(int ms, int fps)
    {
        var _pos = Convert.ToInt32(ms / 1000 * fps);
        return _items[Count - _pos] / _pos; 
    }
}

另外要小心 lock 语句,这会花费很多时间。

这里有很多答案..不妨再添加一个:)

这个可能需要一些小的调试才能“关闭一个”等 - 我没有一个真正的数据集可以使用所以也许把它当作伪代码

就像你的一样:有一个环形缓冲区 - 给它足够的容量来容纳 N 个样本,其中 N 足以检查你的移动平均线 - 100 SPS 并且想要检查 250ms 我认为你至少需要 25 , 但我们并不缺少 space 所以你可以做得更多

struct Cirray
{
    long _head;
    TimedFloat[] _data;

    public Cirray(int capacity)
    {
        _head = 0;
        _data = new TimedFloat[capacity];
    }

    public void Add(float f)
    {
        _data[_head++%_data.Length] = new TimedFloat() { F = f };
    }

    public IEnumerable<float> GetAverages(int[] forDeltas)
    {
        double sum = 0;
        long start = _head - 1;
        long now = _data[start].T;
        int whichDelta = 0;

        for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
        {
            if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
            {
                yield return (float)(sum / (start - idx));
                whichDelta++;
            }

            sum += _data[idx % _data.Length].F;
        }
    }
}

struct TimedFloat
{
    [DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)]
    private static extern void GetSystemTimePreciseAsFileTime(out long filetime);


    private float _f;
    public float F { get => _f;
        set {
            _f = value;
            GetSystemTimePreciseAsFileTime(out long x);
            T = DateTime.FromFileTimeUtc(x).Ticks;
        }
    }
    public long T;

}

正常的 DateTime.UtcNow 不是很精确 - 大约 16 毫秒 - 所以如果你说即使是一个样本也可能将其丢弃,那么它可能不适用于像这样的时间戳数据。相反,我们可以做到这一点,这样我们就可以获得与高分辨率计时器等效的滴答声,如果您的系统支持它(如果不支持,您可能必须更改系统,或者滥用 StopWatch class 来提供更高分辨率的补充)和我们正在为每个数据项添加时间戳。

我考虑过维护 N 个不断移动的指向数据尾端的指针和 dec/incrementing N 个总和的复杂性 - 它仍然可以完成(而且你清楚地知道如何)但是您的问题读起来好像您可能很少要求平均值,以至于 N sums/counts 解决方案将花费更多时间来维护计数,而不是 运行 时不时通过 250 或 500 个浮点数然后把它们加起来。结果,GetAverages 获取您想要数据范围的 ticks(每毫秒 10,000)的数组,例如new[] { 50 * 10000, 100 * 10000, 150 * 10000, 200 * 10000, 250 * 10000 } 50 毫秒到 250 毫秒,步长为 50,它从当前头部开始并向后求和,直到它要打破时间边界的点(这可能是偏离一位)产生该时间跨度的平均值,然后为下一个时间跨度恢复求和和计数(开始时的数学减去当前索引给出的计数)。我想我理解你想要的是正确的,例如“过去 50 毫秒的平均值”和“过去 100 毫秒的平均值”,而不是“最近 50 毫秒的平均值”和“最近 50 毫秒的平均值”

编辑:

想了想,做了这个:

构造 Cirray { 长头; TimedFloat[] _data; RunningAverage[] _ravgs;

    public Cirray(int capacity)
    {
        _head = 0;
        _data = new TimedFloat[capacity];
    }

    public Cirray(int capacity, int[] deltas) : this(capacity)
    {
        _ravgs = new RunningAverage[deltas.Length];
        for (int i = 0; i < deltas.Length; i++)
            _ravgs[i] = new RunningAverage() { OverMilliseconds = deltas[i] };
    }

    public void Add(float f)
    {
        //in c# every assignment returns the assigned value; capture it for use later
        var addedTF = (_data[_head++ % _data.Length] = new TimedFloat() { F = f });

        if (_ravgs == null)
            return;

        foreach (var ra in _ravgs)
        {
            //add the new tf to each RA
            ra.Count++;
            ra.Total += addedTF.F;

            //move the end pointer in the RA circularly up the array, subtracting/uncounting as we go
            var boundary = addedTF.T - ra.OverMilliseconds; 
            while (_data[ra.EndPointer].T < boundary) //while the sample is timed before the boundary, move the
            {
                ra.Count--; 
                ra.Total -= _data[ra.EndPointer].F;
                ra.EndPointer = (ra.EndPointer + 1) % _data.Length; //circular indexing
            }
        }

    }

    public IEnumerable<float> GetAverages(int[] forDeltas)
    {
        double sum = 0;
        long start = _head - 1;
        long now = _data[start].T;
        int whichDelta = 0;

        for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
        {
            if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
            {
                yield return (float)(sum / (start - idx));
                whichDelta++;
            }

            sum += _data[idx % _data.Length].F;
        }
    }

    public IEnumerable<float> GetAverages() //from the built ins
    {
        foreach (var ra in _ravgs)
        {
            if (ra.Count == 0)
                yield return 0;
            else
                yield return (float)(ra.Total / ra.Count);
        }
    }
}

绝对没有测试过,但是在评论中体现了我的想法