在 O(1) 中查找流中最后 N 个浮点数的平均值

Find Average of Last N Floats in a Stream in O(1)

让我们假设我有一个无限的浮动流 s_i。我想计算最后 10000 个值的平均值。

这个问题有一个简单而低效的解决方案。只需将最后 10000 个值存储在 list/queue 中,并通过每次迭代所有元素来计算平均值。这导致运行时间为 O(n),其中 n 是列表的长度。这个解决方案对于我的用例来说效率太低了。

这个问题的快速解决方案是引入一个变量来保存最后 10000 个元素的总和。每当一个新元素到达时,它将被添加到变量中,而最旧的(存储在队列中)将从变量中减去。这个解决方案是 O(1)。 (是的,内存仍然是 O(n),但内存根本不是问题)

但是,“快速解决方案”存在问题。由于浮点不精确和舍入误差,总和会随着时间的推移而漂移并变得不准确。 java 中的这个 MVCE 显示了问题:

float sum = 0;
List<Float> nums = new ArrayList<>();
for(int i = 1; i<=10000; i++) {
    nums.add((float) (Math.random()));
}
for(float f: nums) {
    sum += f;
}
for(float f: nums) {
    sum -= f;
}
System.out.println(sum); //Not printing Zero

请注意,使用 double 并不能解决这个问题,只会让它变得不那么重要。

那么这个问题的优雅解决方案是什么?

您无法修复浮点运算的有限精度。 为了避免漂移有两个想法:

  1. 做快版,忽略漂移。在固定数量的数字后,努力计算新的 O(n) 的孔和。这样你得到 O(1) 大多数时间但有时 O(n)。我知道这构成了一个 O(n) 算法,但有一个非常小的因素

  2. 将float转换成没有算术题的格式。类似于定点表示(可能仅适用于您的值在有限范围内的情况)

这里有两种解决方法。底部是演示每个元素 O(1) 解决方案的代码,其中每个输出仅通过对来自其相关输入的元素求和来计算。它没有从正在进行的总和中累积的舍入误差。 (这意味着它通常可以用于任何归约运算,例如查找最大值或最小值,而不仅仅是加法。)

float 格式跨越 276 位,从 2−149(次正规数的最低有效位)到 2+126(格式最大有限数的最高有效位 [MSB])。最大有限数的 10,000 倍具有 2140 的 MSB,因此和最多跨越 140 − −149 + 1 = 290 位。因此,使用一个 290 位的数组,加上一个符号位,您可以跟踪 10,000 个浮点数的精确总和,没有舍入错误。

(请注意,实现可以包含一些 slack 以避免过多进位。例如,如果将 290 位实现为十个 32 位元素的数组,则完全使用前九个元素和 2 位最后一个元素(加上另一个符号),然后,在最坏的情况下,来自低位元素的进位可能会级联所有高位元素。相反,一个人可能会使用 20 个 32 位元素,但主要只使用大约 16 个每个元素的位,允许进位在该元素内累积而不是被添加到更高的元素。只有当它积累到需要实际进位或检查高元素以读取当前总数时,才有必要协调松弛.)

第二种方法可以为 N 个元素提供 O(N) 的复杂度,因此每个元素的复杂度为 O(1)(对于固定的 window 大小)。为此,让我们从数组 II[0]I[9999] 的前 10,000 个元素开始。向后累积部分和。给定一些数组 T(这可以是临时数组,或者可以在输出数组中完成工作),设置 T[9999] = I[9999]T[9998] = T[9999] + I[9998] 等等——一般来说,T[i] = T[i+1] + I[i]。完成后,T[0] 将包含前 10,000 个输入的总和,因此这是我们的第一个输出。

现在请注意,T[1] 是第一个之后的 9,999 个输入的总和,因此 T[1] + I[10000] 是第二个输出。事实上,到目前为止计算的每个 T[i] 都是我们想要的总和的 前缀 ;它是输出总和的初始 10000−i 个元素的总和 i。那么我们要完成的就是匹配后缀。到目前为止计算的每个前缀都以 9,999 结尾;添加到其中的最后一个(索引最高的)输入元素是 I[9999]。因此,如果我们以 I[10000] 开始部分和 S,它就是我们需要添加到 T[1] 的后缀。然后将 I[10001] 添加到 S 给我们需要添加到 T[2] 的后缀,依此类推。因此,在第一个输出之后,9,999 个输出中的每一个都可以通过用新输入更新 S 并输出 ST[i] 的总和来计算。这个块的最终输出可以通过用另一个输入更新 S 并输出它来产生——此时 S 是 10,000 个元素的后缀总和,需要一个零元素的前缀,所以不需要加法任何 T[i] 是必需的。

因此,用于计算前缀的 9,999 次加法和用于更新 S 的 9,999 次加法产生了 10,0001 个输出,因此每个输出只有不到两个加法。然后可以对另一个包含 10,000 个值的块重复此过程。

输出可能仍然存在浮点运算的舍入误差,但每个输出仅由仅涉及其输入的有限多次“单向”加法构成。没有从持续的求和中减去较早的元素,随着时间的推移累积更多的舍入误差。

这是演示代码。

#include <stdlib.h>


#define Operation(a, b) ((a) + (b))


/*  Sliding window reduction.

    For 0 <= i < N, each C[i] is the result of a reduction operation of the W
    elements of A from A[i] to A[i+W-1].  The Operation macro defines the
    reduction operation.
*/
void SlidingWindowReduction(const float *A, float *C, size_t N, size_t W)
{
    //  Do whole segments of length W+1.
    for (; W < N; N -= W+1, A += W+1, C += W+1)
    {
        //  Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
        float p = A[W-1];
        for (size_t w = 1; w < W; ++w)
        {
            C[W-w] = p;
            float t = A[W-1-w];
            p = Operation(p, t);
        }
        C[0] = p;

        //  Build suffixes and paste them onto prefixes.
        //  Each suffix is Operation(C[W] to C[W+w]).
        float s = A[W];
        for (size_t w = 1; w < W; ++w)
        {
            //  Paste prefix (w to W-1) with suffix (W to W+w-1).
            float p = C[w]; 
            C[w] = Operation(p, s);

            //  Build suffix.
            float t = A[W+w];
            s = Operation(s, t);
        }
        C[W] = s;
    }

    //  Do final segment.
    if (N)
    {
        //  Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
        float p = A[W-1];
        for (size_t w = 1; w < W-N+1; ++w)  //  Unstored prefixes.
        {
            float t = A[W-1-w];
            p = Operation(p, t);
        }
        for (size_t w = W-N+1; w < W; ++w)  //  Stored prefixes.
        {
            C[W-w] = p;
            float t = A[W-1-w];
            p = Operation(p, t);
        }
        C[0] = p;

        //  Build suffixes and paste them onto prefixes.
        //  Each suffix is Operation(C[W] to C[W+w]).
        if (1 < N)
        {
            float s = A[W];
            for (size_t w = 1; w < N-1; ++w)
            {
                //  Paste prefix (w to W-1) with suffix (W to W+w-1).
                float p = C[w]; 
                C[w] = Operation(p, s);

                //  Build suffix.
                float t = A[W+w];
                s = Operation(s, t);
            }
            {
                float p = C[N-1];
                C[N-1] = Operation(p, s);
            }
        }
    }
}


#include <stdio.h>


#define NumberOf(a) (sizeof (a) / sizeof *(a))


int main(void)
{
    enum { W = 3 }; //  Number of elements in window.

    float A[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
    float C[NumberOf(A) - W + 1] = { 0 };

    SlidingWindowReduction(A, C, NumberOf(C), 3);

    for (size_t c = 0; c < NumberOf(C); ++c)
        printf("C[%zu] = %g.\n", c, C[c]);
}

可以在不使用减法的情况下以硬实时方式保持 windowed 平均值。在下面未经测试的代码中,a 存储了 window 中最近最少添加的元素的后缀和,b 存储了下一个后缀和(正在构建),而 c存储最近添加的元素。 (实际上可以对 ac 使用相同的数组,但这确实会使代码更难理解。)

public final class WindowedAverage {
  public WindowedAverage(final int n) {
    assert n > 0;
    scalar = 1 / (float) n;
    assert n % 2 == 0;
    half_n = n / 2;
    a = new float[half_n];
    b = new float[half_n];
    b_sum = 0;
    c = new float[half_n];
    c_sum = 0;
    i = 0;
  }

  public float average() {
    return a[i] + b_sum + c_sum;
  }

  void add(final float s) {
    final float term = scalar * s;
    c[i] = term;
    c_sum += term;
    i++;
    if (i < half_n) {
      b[half_n - (i + 1)] += b[half_n - i];
    } else {
      float[] temp = a;
      a = b;
      b = c;
      b_sum = c_sum;
      c = temp;
      c_sum = 0;
      i = 0;
    }
  }

  private final float scalar;
  private final int half_n;
  private final float[] a;
  private final float[] b;
  private float b_sum;
  private final float[] c;
  private float c_sum;
  private int i;
}