在 O(1) 中查找流中最后 N 个浮点数的平均值
Find Average of Last N Floats in a Stream in O(1)
让我们假设我有一个无限的浮动流 s_i
。我想计算最后 10000 个值的平均值。
这个问题有一个简单而低效的解决方案。只需将最后 10000 个值存储在 list/queue 中,并通过每次迭代所有元素来计算平均值。这导致运行时间为 O(n),其中 n 是列表的长度。这个解决方案对于我的用例来说效率太低了。
这个问题的快速解决方案是引入一个变量来保存最后 10000 个元素的总和。每当一个新元素到达时,它将被添加到变量中,而最旧的(存储在队列中)将从变量中减去。这个解决方案是 O(1)。 (是的,内存仍然是 O(n),但内存根本不是问题)
但是,“快速解决方案”存在问题。由于浮点不精确和舍入误差,总和会随着时间的推移而漂移并变得不准确。 java 中的这个 MVCE 显示了问题:
float sum = 0;
List<Float> nums = new ArrayList<>();
for(int i = 1; i<=10000; i++) {
nums.add((float) (Math.random()));
}
for(float f: nums) {
sum += f;
}
for(float f: nums) {
sum -= f;
}
System.out.println(sum); //Not printing Zero
请注意,使用 double 并不能解决这个问题,只会让它变得不那么重要。
那么这个问题的优雅解决方案是什么?
您无法修复浮点运算的有限精度。
为了避免漂移有两个想法:
做快版,忽略漂移。在固定数量的数字后,努力计算新的 O(n) 的孔和。这样你得到 O(1) 大多数时间但有时 O(n)。我知道这构成了一个 O(n) 算法,但有一个非常小的因素
将float转换成没有算术题的格式。类似于定点表示(可能仅适用于您的值在有限范围内的情况)
这里有两种解决方法。底部是演示每个元素 O(1) 解决方案的代码,其中每个输出仅通过对来自其相关输入的元素求和来计算。它没有从正在进行的总和中累积的舍入误差。 (这意味着它通常可以用于任何归约运算,例如查找最大值或最小值,而不仅仅是加法。)
float
格式跨越 276 位,从 2−149(次正规数的最低有效位)到 2+126(格式最大有限数的最高有效位 [MSB])。最大有限数的 10,000 倍具有 2140 的 MSB,因此和最多跨越 140 − −149 + 1 = 290 位。因此,使用一个 290 位的数组,加上一个符号位,您可以跟踪 10,000 个浮点数的精确总和,没有舍入错误。
(请注意,实现可以包含一些 slack 以避免过多进位。例如,如果将 290 位实现为十个 32 位元素的数组,则完全使用前九个元素和 2 位最后一个元素(加上另一个符号),然后,在最坏的情况下,来自低位元素的进位可能会级联所有高位元素。相反,一个人可能会使用 20 个 32 位元素,但主要只使用大约 16 个每个元素的位,允许进位在该元素内累积而不是被添加到更高的元素。只有当它积累到需要实际进位或检查高元素以读取当前总数时,才有必要协调松弛.)
第二种方法可以为 N 个元素提供 O(N) 的复杂度,因此每个元素的复杂度为 O(1)(对于固定的 window 大小)。为此,让我们从数组 I
从 I[0]
到 I[9999]
的前 10,000 个元素开始。向后累积部分和。给定一些数组 T
(这可以是临时数组,或者可以在输出数组中完成工作),设置 T[9999]
= I[9999]
、T[9998] = T[9999] + I[9998]
等等——一般来说,T[i] = T[i+1] + I[i]
。完成后,T[0]
将包含前 10,000 个输入的总和,因此这是我们的第一个输出。
现在请注意,T[1]
是第一个之后的 9,999 个输入的总和,因此 T[1] + I[10000]
是第二个输出。事实上,到目前为止计算的每个 T[i]
都是我们想要的总和的 前缀 ;它是输出总和的初始 10000−i
个元素的总和 i
。那么我们要完成的就是匹配后缀。到目前为止计算的每个前缀都以 9,999 结尾;添加到其中的最后一个(索引最高的)输入元素是 I[9999]
。因此,如果我们以 I[10000]
开始部分和 S
,它就是我们需要添加到 T[1]
的后缀。然后将 I[10001]
添加到 S
给我们需要添加到 T[2]
的后缀,依此类推。因此,在第一个输出之后,9,999 个输出中的每一个都可以通过用新输入更新 S
并输出 S
和 T[i]
的总和来计算。这个块的最终输出可以通过用另一个输入更新 S
并输出它来产生——此时 S
是 10,000 个元素的后缀总和,需要一个零元素的前缀,所以不需要加法任何 T[i]
是必需的。
因此,用于计算前缀的 9,999 次加法和用于更新 S 的 9,999 次加法产生了 10,0001 个输出,因此每个输出只有不到两个加法。然后可以对另一个包含 10,000 个值的块重复此过程。
输出可能仍然存在浮点运算的舍入误差,但每个输出仅由仅涉及其输入的有限多次“单向”加法构成。没有从持续的求和中减去较早的元素,随着时间的推移累积更多的舍入误差。
这是演示代码。
#include <stdlib.h>
#define Operation(a, b) ((a) + (b))
/* Sliding window reduction.
For 0 <= i < N, each C[i] is the result of a reduction operation of the W
elements of A from A[i] to A[i+W-1]. The Operation macro defines the
reduction operation.
*/
void SlidingWindowReduction(const float *A, float *C, size_t N, size_t W)
{
// Do whole segments of length W+1.
for (; W < N; N -= W+1, A += W+1, C += W+1)
{
// Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
float p = A[W-1];
for (size_t w = 1; w < W; ++w)
{
C[W-w] = p;
float t = A[W-1-w];
p = Operation(p, t);
}
C[0] = p;
// Build suffixes and paste them onto prefixes.
// Each suffix is Operation(C[W] to C[W+w]).
float s = A[W];
for (size_t w = 1; w < W; ++w)
{
// Paste prefix (w to W-1) with suffix (W to W+w-1).
float p = C[w];
C[w] = Operation(p, s);
// Build suffix.
float t = A[W+w];
s = Operation(s, t);
}
C[W] = s;
}
// Do final segment.
if (N)
{
// Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
float p = A[W-1];
for (size_t w = 1; w < W-N+1; ++w) // Unstored prefixes.
{
float t = A[W-1-w];
p = Operation(p, t);
}
for (size_t w = W-N+1; w < W; ++w) // Stored prefixes.
{
C[W-w] = p;
float t = A[W-1-w];
p = Operation(p, t);
}
C[0] = p;
// Build suffixes and paste them onto prefixes.
// Each suffix is Operation(C[W] to C[W+w]).
if (1 < N)
{
float s = A[W];
for (size_t w = 1; w < N-1; ++w)
{
// Paste prefix (w to W-1) with suffix (W to W+w-1).
float p = C[w];
C[w] = Operation(p, s);
// Build suffix.
float t = A[W+w];
s = Operation(s, t);
}
{
float p = C[N-1];
C[N-1] = Operation(p, s);
}
}
}
}
#include <stdio.h>
#define NumberOf(a) (sizeof (a) / sizeof *(a))
int main(void)
{
enum { W = 3 }; // Number of elements in window.
float A[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
float C[NumberOf(A) - W + 1] = { 0 };
SlidingWindowReduction(A, C, NumberOf(C), 3);
for (size_t c = 0; c < NumberOf(C); ++c)
printf("C[%zu] = %g.\n", c, C[c]);
}
可以在不使用减法的情况下以硬实时方式保持 windowed 平均值。在下面未经测试的代码中,a
存储了 window 中最近最少添加的元素的后缀和,b
存储了下一个后缀和(正在构建),而 c
存储最近添加的元素。 (实际上可以对 a
和 c
使用相同的数组,但这确实会使代码更难理解。)
public final class WindowedAverage {
public WindowedAverage(final int n) {
assert n > 0;
scalar = 1 / (float) n;
assert n % 2 == 0;
half_n = n / 2;
a = new float[half_n];
b = new float[half_n];
b_sum = 0;
c = new float[half_n];
c_sum = 0;
i = 0;
}
public float average() {
return a[i] + b_sum + c_sum;
}
void add(final float s) {
final float term = scalar * s;
c[i] = term;
c_sum += term;
i++;
if (i < half_n) {
b[half_n - (i + 1)] += b[half_n - i];
} else {
float[] temp = a;
a = b;
b = c;
b_sum = c_sum;
c = temp;
c_sum = 0;
i = 0;
}
}
private final float scalar;
private final int half_n;
private final float[] a;
private final float[] b;
private float b_sum;
private final float[] c;
private float c_sum;
private int i;
}
让我们假设我有一个无限的浮动流 s_i
。我想计算最后 10000 个值的平均值。
这个问题有一个简单而低效的解决方案。只需将最后 10000 个值存储在 list/queue 中,并通过每次迭代所有元素来计算平均值。这导致运行时间为 O(n),其中 n 是列表的长度。这个解决方案对于我的用例来说效率太低了。
这个问题的快速解决方案是引入一个变量来保存最后 10000 个元素的总和。每当一个新元素到达时,它将被添加到变量中,而最旧的(存储在队列中)将从变量中减去。这个解决方案是 O(1)。 (是的,内存仍然是 O(n),但内存根本不是问题)
但是,“快速解决方案”存在问题。由于浮点不精确和舍入误差,总和会随着时间的推移而漂移并变得不准确。 java 中的这个 MVCE 显示了问题:
float sum = 0;
List<Float> nums = new ArrayList<>();
for(int i = 1; i<=10000; i++) {
nums.add((float) (Math.random()));
}
for(float f: nums) {
sum += f;
}
for(float f: nums) {
sum -= f;
}
System.out.println(sum); //Not printing Zero
请注意,使用 double 并不能解决这个问题,只会让它变得不那么重要。
那么这个问题的优雅解决方案是什么?
您无法修复浮点运算的有限精度。 为了避免漂移有两个想法:
做快版,忽略漂移。在固定数量的数字后,努力计算新的 O(n) 的孔和。这样你得到 O(1) 大多数时间但有时 O(n)。我知道这构成了一个 O(n) 算法,但有一个非常小的因素
将float转换成没有算术题的格式。类似于定点表示(可能仅适用于您的值在有限范围内的情况)
这里有两种解决方法。底部是演示每个元素 O(1) 解决方案的代码,其中每个输出仅通过对来自其相关输入的元素求和来计算。它没有从正在进行的总和中累积的舍入误差。 (这意味着它通常可以用于任何归约运算,例如查找最大值或最小值,而不仅仅是加法。)
float
格式跨越 276 位,从 2−149(次正规数的最低有效位)到 2+126(格式最大有限数的最高有效位 [MSB])。最大有限数的 10,000 倍具有 2140 的 MSB,因此和最多跨越 140 − −149 + 1 = 290 位。因此,使用一个 290 位的数组,加上一个符号位,您可以跟踪 10,000 个浮点数的精确总和,没有舍入错误。
(请注意,实现可以包含一些 slack 以避免过多进位。例如,如果将 290 位实现为十个 32 位元素的数组,则完全使用前九个元素和 2 位最后一个元素(加上另一个符号),然后,在最坏的情况下,来自低位元素的进位可能会级联所有高位元素。相反,一个人可能会使用 20 个 32 位元素,但主要只使用大约 16 个每个元素的位,允许进位在该元素内累积而不是被添加到更高的元素。只有当它积累到需要实际进位或检查高元素以读取当前总数时,才有必要协调松弛.)
第二种方法可以为 N 个元素提供 O(N) 的复杂度,因此每个元素的复杂度为 O(1)(对于固定的 window 大小)。为此,让我们从数组 I
从 I[0]
到 I[9999]
的前 10,000 个元素开始。向后累积部分和。给定一些数组 T
(这可以是临时数组,或者可以在输出数组中完成工作),设置 T[9999]
= I[9999]
、T[9998] = T[9999] + I[9998]
等等——一般来说,T[i] = T[i+1] + I[i]
。完成后,T[0]
将包含前 10,000 个输入的总和,因此这是我们的第一个输出。
现在请注意,T[1]
是第一个之后的 9,999 个输入的总和,因此 T[1] + I[10000]
是第二个输出。事实上,到目前为止计算的每个 T[i]
都是我们想要的总和的 前缀 ;它是输出总和的初始 10000−i
个元素的总和 i
。那么我们要完成的就是匹配后缀。到目前为止计算的每个前缀都以 9,999 结尾;添加到其中的最后一个(索引最高的)输入元素是 I[9999]
。因此,如果我们以 I[10000]
开始部分和 S
,它就是我们需要添加到 T[1]
的后缀。然后将 I[10001]
添加到 S
给我们需要添加到 T[2]
的后缀,依此类推。因此,在第一个输出之后,9,999 个输出中的每一个都可以通过用新输入更新 S
并输出 S
和 T[i]
的总和来计算。这个块的最终输出可以通过用另一个输入更新 S
并输出它来产生——此时 S
是 10,000 个元素的后缀总和,需要一个零元素的前缀,所以不需要加法任何 T[i]
是必需的。
因此,用于计算前缀的 9,999 次加法和用于更新 S 的 9,999 次加法产生了 10,0001 个输出,因此每个输出只有不到两个加法。然后可以对另一个包含 10,000 个值的块重复此过程。
输出可能仍然存在浮点运算的舍入误差,但每个输出仅由仅涉及其输入的有限多次“单向”加法构成。没有从持续的求和中减去较早的元素,随着时间的推移累积更多的舍入误差。
这是演示代码。
#include <stdlib.h>
#define Operation(a, b) ((a) + (b))
/* Sliding window reduction.
For 0 <= i < N, each C[i] is the result of a reduction operation of the W
elements of A from A[i] to A[i+W-1]. The Operation macro defines the
reduction operation.
*/
void SlidingWindowReduction(const float *A, float *C, size_t N, size_t W)
{
// Do whole segments of length W+1.
for (; W < N; N -= W+1, A += W+1, C += W+1)
{
// Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
float p = A[W-1];
for (size_t w = 1; w < W; ++w)
{
C[W-w] = p;
float t = A[W-1-w];
p = Operation(p, t);
}
C[0] = p;
// Build suffixes and paste them onto prefixes.
// Each suffix is Operation(C[W] to C[W+w]).
float s = A[W];
for (size_t w = 1; w < W; ++w)
{
// Paste prefix (w to W-1) with suffix (W to W+w-1).
float p = C[w];
C[w] = Operation(p, s);
// Build suffix.
float t = A[W+w];
s = Operation(s, t);
}
C[W] = s;
}
// Do final segment.
if (N)
{
// Build prefixes. C[W-w] receives Operation(A[W-w] to A[W-1]).
float p = A[W-1];
for (size_t w = 1; w < W-N+1; ++w) // Unstored prefixes.
{
float t = A[W-1-w];
p = Operation(p, t);
}
for (size_t w = W-N+1; w < W; ++w) // Stored prefixes.
{
C[W-w] = p;
float t = A[W-1-w];
p = Operation(p, t);
}
C[0] = p;
// Build suffixes and paste them onto prefixes.
// Each suffix is Operation(C[W] to C[W+w]).
if (1 < N)
{
float s = A[W];
for (size_t w = 1; w < N-1; ++w)
{
// Paste prefix (w to W-1) with suffix (W to W+w-1).
float p = C[w];
C[w] = Operation(p, s);
// Build suffix.
float t = A[W+w];
s = Operation(s, t);
}
{
float p = C[N-1];
C[N-1] = Operation(p, s);
}
}
}
}
#include <stdio.h>
#define NumberOf(a) (sizeof (a) / sizeof *(a))
int main(void)
{
enum { W = 3 }; // Number of elements in window.
float A[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
float C[NumberOf(A) - W + 1] = { 0 };
SlidingWindowReduction(A, C, NumberOf(C), 3);
for (size_t c = 0; c < NumberOf(C); ++c)
printf("C[%zu] = %g.\n", c, C[c]);
}
可以在不使用减法的情况下以硬实时方式保持 windowed 平均值。在下面未经测试的代码中,a
存储了 window 中最近最少添加的元素的后缀和,b
存储了下一个后缀和(正在构建),而 c
存储最近添加的元素。 (实际上可以对 a
和 c
使用相同的数组,但这确实会使代码更难理解。)
public final class WindowedAverage {
public WindowedAverage(final int n) {
assert n > 0;
scalar = 1 / (float) n;
assert n % 2 == 0;
half_n = n / 2;
a = new float[half_n];
b = new float[half_n];
b_sum = 0;
c = new float[half_n];
c_sum = 0;
i = 0;
}
public float average() {
return a[i] + b_sum + c_sum;
}
void add(final float s) {
final float term = scalar * s;
c[i] = term;
c_sum += term;
i++;
if (i < half_n) {
b[half_n - (i + 1)] += b[half_n - i];
} else {
float[] temp = a;
a = b;
b = c;
b_sum = c_sum;
c = temp;
c_sum = 0;
i = 0;
}
}
private final float scalar;
private final int half_n;
private final float[] a;
private final float[] b;
private float b_sum;
private final float[] c;
private float c_sum;
private int i;
}