计算时间序列的滑动 window 最大值

Calculate max on a sliding window for TimeSeries

输入:

    public class MyObject
    {
        public double Value { get; set; }
        public DateTime Date { get; set; }
    }

生成测试对象的方法:

public static MyObject[] GetTestObjects()
{
    var rnd = new Random();
    var date = new DateTime(2021, 1, 1, 0, 0, 0);
    var result = new List<MyObject>();
    for (int i = 0; i < 50000; i++)
    {
        //this is to simulate real data having gaps
        if (rnd.Next(100) < 25)
        {
            continue;
        }
        var myObject = new MyObject()
        {
            Value = rnd.NextDouble(),
            Date = date.AddMinutes(15 * i)
        };
        result.Add(myObject);
    }
    return result.ToArray();
}

鉴于此,我需要为每个 myObject 计算前 12 个月的最大值。我只能考虑 InParallel,但也许有优化的解决方案?

抱歉不清楚,这是我现在用来得到我想要的东西:

        public MyObject[] BruteForceBackward(MyObject[] testData)
        {
            return testData.AsParallel().Select(point =>
            {
                var max = testData.Where(x => x.Date <= point.Date && x.Date >= point.Date.AddYears(-1)).Max(x => x.Value);
                return new MyObject() { Date = point.Date, Value = point.Value / max };
            }).OrderBy(r => r.Date).ToArray();
        }

这可行,但它很慢并且会占用处理器资源(想象一下,你有 100k 个对象),我相信一定有更好的东西

假设您的意思是您需要从 result 开始的过去 12 个月中每个月的最大值 Value,那么您可以使用 LINQ:

var beginDateTime = DateTime.Now.AddMonths(-12);
var ans = result.Where(r => r.Date >= beginDateTime).GroupBy(r => r.Date.Month).Select(mg => mg.MaxBy(r => r.Value)).ToList();

运行 一些时间,我知道在 result 之后放置 AsParallel 将 运行 时间从大约 16 毫秒(第一个 运行)更改为大约 32 毫秒,所以它实际上更慢。 Where 之后和 GroupBy 之后大约 23 毫秒(并行处理 12 个组)大致相同。至少在我的 PC 上,没有足够的数据或复杂的并行操作,但 GroupBy 不是最有效的。

使用数组并测试每个元素,我在大约 1.2 毫秒内得到了结果:

var maxMOs = new MyObject[12];
foreach (var r in result.Where(r => r.Date >= beginDateTime)) {
    var monthIndex = r.Date.Month-1;
    if (maxMOs[monthIndex] == null || r.Value > maxMOs[monthIndex].Value)
        maxMOs[monthIndex] = r;
}

请注意,结果不是按时间顺序排列的;如果需要,您可以将 monthIndex 偏移到今天的月份以对结果进行排序。

var maxMOs = new MyObject[12];
var offset = DateTime.Now.Month-11;
foreach (var r in result.Where(r => r.Date >= beginDateTime)) {
    var monthIndex = r.Date.Month-offset;
    if (maxMOs[monthIndex] == null || r.Value > maxMOs[monthIndex].Value)
        maxMOs[monthIndex] = r;
}

微优化(主要用于重复 运行nings)是反转测试并使用空传播运算符:

if (!(r.Value <= maxMOs[monthIndex]?.Value))

这在第一个 运行 上节省了大约 0.2 毫秒,但在随后的 运行 上最多节省了 0.5 毫秒。

我有一个类似的项目,我必须根据大量传感器数据计算这些东西。

一般来说,您希望减少遍历所有数据的循环次数。充其量,您只想触摸每个元素一次。

进程数组(相当于BruteForceBackwards

public static MyObject[] FlowThroughForward(ref MyObject[] testData)
{
    // generate return array
    MyObject[] returnData = new MyObject[testData.Length];
    // keep track to minimize processing
    double currentMaximum = 0;
    List<MyObject> maximumValues = new List<MyObject>();
    // go through the elements
    for (int i = 0; i < testData.Length; i++)
    {
        // calculate the oldest date to keep in tracking list
        DateTime targetDate = testData[i].Date.AddYears(-1);
        // maximum logic
        if (testData[i].Value >= currentMaximum)
        {
            // new maximum found, clear tracking list
            // this is the best case scenario
            maximumValues.Clear();
            currentMaximum = testData[i].Value;
        }
        else
        {
            // unfortunately, no new maximum was found
            // go backwards the maximum tracking list and check for smaller values
            // clear the list of all smaller values. The list should therefore always
            // be in descending order
            for (int b = maximumValues.Count - 1; b >= 0; b--)
            {
                if (maximumValues[b].Value <= testData[i].Value)
                {
                    // a lower value has been found. We have a newer, higher value
                    // clear this waste value from the tracking list
                    maximumValues.RemoveAt(b);
                }
                else
                {
                    // there are no more lower values. 
                    // stop looking for smaller values to save time
                    break;
                }
            }
        }
        // append new value to tracking list, no matter if higher or lower
        // all future values might be lower
        maximumValues.Add(testData[i]);
        // check if the oldest value is too old to be kept in the tracking list
        while (maximumValues[0].Date < targetDate)
        {
            // oldest value is to be removed
            maximumValues.RemoveAt(0);
            // update maximum
            currentMaximum = maximumValues[0].Value;
        }
        // add object to result list
        returnData[i] = new MyObject() { Date = testData[i].Date, Value = testData[i].Value / currentMaximum }; ;
    }
    return returnData;
}

实时数据或流数据

注意:如果列表非常大,则传递完整数组的方法可能会出现内存问题。在这种情况下:一次传递一个值,将它们从最旧的值传递到最新的值。一次存储一个值。 此功能也可用于实时数据。
测试方法包含在代码中。

static void Main(string[] args)
{
    int length = 50000;
    
    Stopwatch stopWatch1 = new Stopwatch();
    stopWatch1.Start();
    var myObject = new MyObject();
    var result = new List<MyObject>();
    var date = new DateTime(2021, 1, 1, 0, 0, 0);
    for (int i = 0; i < length; i++)
    {
        //this is to simulate real data having gaps
        if (rnd.Next(100) < 25)
        {
            continue;
        }
        myObject.Value = rnd.NextDouble();
        myObject.Date = date.AddMinutes(15 * i);
        result.Add(CalculateNextObject(ref myObject));
    }
    stopWatch1.Stop();
    Console.WriteLine("test code executed in " + stopWatch1.ElapsedMilliseconds + " ms");
    Thread.Sleep(1000000);
}

private static Random rnd = new Random();
private static double currentMaximum = 0;
private static List<MyObject> maximumValues = new List<MyObject>();
public static MyObject CalculateNextObject(ref MyObject input)
{
        // calculate the oldest date to keep in tracking list
        DateTime targetDate = input.Date.AddYears(-1);
        // maximum logic
        if (input.Value >= currentMaximum)
        {
            // new maximum found, clear tracking list
            // this is the best case scenario
            maximumValues.Clear();
            currentMaximum = input.Value;
        }
        else
        {
            // unfortunately, no new maximum was found
            // go backwards the maximum tracking list and check for smaller values
            // clear the list of all smaller values. The list should therefore always
            // be in descending order
            for (int b = maximumValues.Count - 1; b >= 0; b--)
            {
                if (maximumValues[b].Value <= input.Value)
                {
                    // a lower value has been found. We have a newer, higher value
                    // clear this waste value from the tracking list
                    maximumValues.RemoveAt(b);
                }
                else
                {
                    // there are no more lower values. 
                    // stop looking for smaller values to save time
                    break;
                }
            }
        }
        // append new value to tracking list, no matter if higher or lower
        // all future values might be lower
        maximumValues.Add(input);
        // check if the oldest value is too old to be kept in the tracking list
        while (maximumValues[0].Date < targetDate)
        {
            // oldest value is to be removed
            maximumValues.RemoveAt(0);
            // update maximum
            currentMaximum = maximumValues[0].Value;
        }
    // add object to result list
    MyObject returnData = new MyObject() { Date = input.Date, Value = input.Value / currentMaximum };
    return returnData;
}

测试方法

static void Main(string[] args)
{
    MyObject[] testData = GetTestObjects();
    Stopwatch stopWatch1 = new Stopwatch();
    Stopwatch stopWatch2 = new Stopwatch();
    stopWatch1.Start();
    MyObject[] testresults1 = BruteForceBackward(testData);
    stopWatch1.Stop();
    Console.WriteLine("BruteForceBackward executed in " + stopWatch1.ElapsedMilliseconds + " ms");
    stopWatch2.Start();
    MyObject[] testresults2 = FlowThroughForward(ref testData);
    stopWatch2.Stop();
    Console.WriteLine("FlowThroughForward executed in " + stopWatch2.ElapsedMilliseconds + " ms");
    Console.WriteLine();
    Console.WriteLine("Comparing some random test results: ");
    var rnd = new Random();
    for (int i = 0; i < 10; i++)
    {
        int index = rnd.Next(0, testData.Length);
        Console.WriteLine("Index: " + index + " brute: " + testresults1[index].Value + " flow: " + testresults2[index].Value);
    }
    Thread.Sleep(1000000);
}

测试结果

测试是在 32 核的机器上进行的,所以理论上多线程方法应该有优势,但你会看到 ;)

Function Function Time time %
BruteForceBackward 5334 ms 99.9%
FlowThroughForward 5 ms 0.094%

性能提升系数:~time/1000

带数据验证的控制台输出:

BruteForceBackward executed in 5264 ms
FlowThroughForward executed in 5 ms

Comparing some random test results:
Index: 25291 brute: 0.989688139105413 flow: 0.989688139105413
Index: 11945 brute: 0.59670821976193 flow: 0.59670821976193
Index: 30282 brute: 0.413238225210297 flow: 0.413238225210297
Index: 33898 brute: 0.38258761939139 flow: 0.38258761939139
Index: 8824 brute: 0.833512217105447 flow: 0.833512217105447
Index: 22092 brute: 0.648052464067263 flow: 0.648052464067263
Index: 24633 brute: 0.35859417692481 flow: 0.35859417692481
Index: 24061 brute: 0.540642018793402 flow: 0.540642018793402
Index: 34219 brute: 0.498785766613022 flow: 0.498785766613022
Index: 2396 brute: 0.151471808392111 flow: 0.151471808392111

Cpu 由于并行化,Bruteforce 向后的使用率要高得多。

最坏的情况是价值长期下降。代码仍然可以大大优化,但我想这应该足够了。为了进一步优化,当 removing/adding 元素变为 maximumValues.

时,可能会减少列表洗牌

这是一个类似于 julian bechtold 的答案的解决方案。不同之处在于最大值(和所有相关变量)隐藏在远离主要实现的地方,在一个单独的 class 中,其目的只是为了跟踪过去一年的最大值。算法是一样的,我只是在这里和那里使用了一些 Linq 表达式。

我们跟踪以下最大值class:

        public class MaxSlidingWindow
        {
            private readonly List<MyObject> _maximumValues;
            private double _max;

            public MaxSlidingWindow()
            {
                _maximumValues = new List<MyObject>();
                _max = double.NegativeInfinity;
            }

            public double Max => _max;
            
            public void Add(MyObject myObject)
            {
                if (myObject.Value >= _max)
                {
                    _maximumValues.Clear();
                    _max = myObject.Value;
                }
                else
                {
                    RemoveValuesSmallerThan(myObject.Value);
                }

                _maximumValues.Add(myObject);
                RemoveObservationsBefore(myObject.Date.AddYears(-1));

                _max = _maximumValues[0].Value;
            }

            private void RemoveObservationsBefore(DateTime targetDate)
            {
                var toRemoveFromFront = 0;
                while (_maximumValues[toRemoveFromFront].Date < targetDate && toRemoveFromFront <= maximumValues3.Count -1)
                {
                    toRemoveFromFront++;
                }

                _maximumValues.RemoveRange(0, toRemoveFromFront);
            }

            private void RemoveValuesSmallerThan(double targetValue)
            {
                var maxEntry = _maximumValues.Count - 1;
                var toRemoveFromBack = 0;
                while (toRemoveFromBack <= maxEntry && _maximumValues[maxEntry - toRemoveFromBack].Value <= targetValue)
                {
                    toRemoveFromBack++;
                }

                _maximumValues.RemoveRange(maxEntry - toRemoveFromBack + 1, toRemoveFromBack);
            }
        }

可以这样使用:

        public static MyObject[] GetTestObjects_MaxSlidingWindow()
        {
            var rnd = new Random();
            var date = new DateTime(2021, 1, 1, 0, 0, 0);
            var result = new List<MyObject>();
            var maxSlidingWindow = new MaxSlidingWindow();
            for (int i = 0; i < 50000; i++)
            {
                //this is to simulate real data having gaps
                if (rnd.Next(100) < 25)
                {
                    continue;
                }
                var myObject = new MyObject()
                {
                    Value = rnd.NextDouble(),
                    Date = date.AddMinutes(15 * i)
                };
                
                maxSlidingWindow.Add(myObject);
                var max = maxSlidingWindow.Max;
                result.Add(new MyObject { Date = myObject.Date, Value = myObject.Value / max });
            }
            return result.ToArray();
        }

查看下面的相对时间 - 上面的解决方案稍微快一些(超过 1000 万次运行),但几乎不引人注意:

Relative timings

一个有趣且具有挑战性的问题。我使用动态规划方法组合了一个解决方案(最早是在 78 年的 CS 算法 class 中学到的)。首先,构建一棵树,其中包含在递归定义的范围内预先计算的局部最大值。构造完成后,可以主要使用预先计算的值有效地计算任意范围的最大值。只有在范围的边缘,计算才会下降到元素级别。

它不如 julian bechtold 的 FlowThroughForward 方法快,但随机访问范围可能是一个优势。

要添加到 Main 的代码:

    Console.WriteLine();
    Stopwatch stopWatch3 = new Stopwatch();
    stopWatch3.Start();
    MyObject[] testresults3 = RangeTreeCalculation(ref testData, 10);
    stopWatch3.Stop();
    Console.WriteLine($"RangeTreeCalculation executed in {stopWatch3.ElapsedMilliseconds} ms");

    ... test comparison
    Console.WriteLine($"Index: {index} brute: {testresults1[index].Value} flow: {testresults2[index].Value} rangeTree: {testresults3[index].Value}");

测试函数:

public static MyObject[] RangeTreeCalculation(ref MyObject[] testDataArray, int partitionThreshold)
{
    // For this implementation, we need to convert the Array to an ArrayList, because we need a
    // reference type object that can be shared.
    List<MyObject> testDataList = testDataArray.ToList();

    // Construct a tree containing recursive collections of pre-calculated values
    var rangeTree = new RangeTree(testDataList, partitionThreshold);

    MyObject[] result = new MyObject[testDataList.Count];
    Parallel.ForEach(testDataList, (item, state, i) =>
        {
            var max = rangeTree.MaxForDateRange(item.Date.AddYears(-1), item.Date);
            result[i] = new MyObject() { Date = item.Date, Value = item.Value / max };
        });

    return result;
}

支持class:

// Class used to divide and conquer using dynamic programming.
public class RangeTree
{
    public List<MyObject> Data; // This reference is shared by all members of the tree
    public int Start { get; } // Index of first element covered by this node.
    public int Count { get; } // Number of elements covered by this node.
    public DateTime FirstDateTime { get; }
    public DateTime LastDateTime { get; }
    public double MaxValue { get; }  // Pre-calculated max for all elements covered by this node.
    List<RangeTree> ChildRanges { get; }

    // Top level node constructor
    public RangeTree(List<MyObject> data, int partitionThreshold)
        : this(data, 0, data.Count, partitionThreshold)
    {
    }
    
    // Child node constructor, which covers an recursively decreasing range of element.
    public RangeTree(List<MyObject> data, int start, int count, int partitionThreshold)
    {
        Data = data;
        Start = start;
        Count = count;
        FirstDateTime = Data[Start].Date;
        LastDateTime = Data[Start + Count - 1].Date;
        if (count <= partitionThreshold)
        {
            // If the range is smaller than the threshold, just calculate the local max
            // directly from the items. No child ranges are defined.
            MaxValue = Enumerable.Range(Start, Count).Select(i => Data[i].Value).Max();
        }
        else
        {
            // We still have a significant range. Decide how to further divide them up into sub-ranges.
            // (There may be room for improvement here to better balance the tree.)
            int partitionSize = (count - 1) / partitionThreshold + 1;
            int partitionCount = (count - 1) / partitionSize + 1;
            if (count < partitionThreshold * partitionThreshold)
            {
                // When one away from leaf nodes, prefer fewer full leaf nodes over more
                // less populated leaf nodes.
                partitionCount = (count - 1) / partitionThreshold + 1;
                partitionSize = (count - 1) / partitionCount + 1;
            }

            ChildRanges = Enumerable.Range(0, partitionCount)
                .Select(partitionNum => new {
                        ChildStart = Start + partitionNum * partitionSize,
                        ChildCount = Math.Min(partitionSize, Count - partitionNum * partitionSize)
                    })
                .Where(part => part.ChildCount > 0) // Defensive
                .Select(part => new RangeTree(Data, part.ChildStart, part.ChildCount, partitionThreshold))
                .ToList();

            // Now is the dynamic programming part:
            // Calculate the local max as the max of all child max values.
            MaxValue = ChildRanges.Max(chile => chile.MaxValue);
        }
    }

    // Get the max value for a given range of dates withing this rangeTree node.
    // This used the precalculated values as much as possible.
    // Only at the fringes of the date range to we calculate at the element level.
    public double MaxForDateRange(DateTime fromDate, DateTime thruDate)
    {
        double calculatedMax = Double.MinValue;
        if (fromDate > this.LastDateTime || thruDate < this.FirstDateTime)
        {
            // Entire range is excluded. Nothing of interest here folks.
            calculatedMax = Double.MinValue;
        }
        else if (fromDate <= this.FirstDateTime && thruDate >= this.LastDateTime)
        {
            // Entire range is included. Use the already-calculated max.
            calculatedMax = this.MaxValue;
        }
        else if (ChildRanges != null)
        {
            // We have child ranges. Recurse and accumulate.
            // Possible optimization: Calculate max for middle ranges first, and only bother
            // with extreme partial ranges if their local max values exceed the preliminary result.
            for (int i = 0; i < ChildRanges.Count; ++i)
            {
                double childMax = ChildRanges[i].MaxForDateRange(fromDate, thruDate);
                if (childMax > calculatedMax)
                {
                    calculatedMax = childMax;
                }
            }
        }
        else
        {
            // Leaf range. Loop through just this limited range of notes, checking individually for
            // date in range and accumulating the result.
            for (int i = 0; i < this.Count; ++i)
            {
                var element = Data[this.Start + i];
                if (fromDate <= element.Date && element.Date <= thruDate && element.Value > calculatedMax)
                {
                    calculatedMax = element.Value;
                }
            }
        }

        return calculatedMax;
    }
}

有很大的改进空间,例如参数化类型和泛化功能以支持不仅仅是 Max(Value),但框架就在那里。