Variance/Std 批量更新的 Welford 算法的公式是什么?
What's the formula for Welford's Algorithm for Variance/Std with Batch Updates?
我想扩展 Welford 的在线算法,以便能够更新多个数字(批量),而不是一次只更新一个:https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
我尝试从 wiki 页面更新算法,如下所示:
# my attempt.
def update1(existingAggregate, newValues):
(count, mean, M2) = existingAggregate
count += len(newValues)
delta = np.sum(np.subtract(newValues, [mean] * len(newValues)))
mean += delta / count
delta2 = np.sum(np.subtract(newValues, [mean] * len(newValues)))
M2 += delta * delta2
return (count, mean, M2)
# The original two functions from wikipedia.
def update(existingAggregate, newValue):
(count, mean, M2) = existingAggregate
count += 1
delta = newValue - mean
mean += delta / count
delta2 = newValue - mean
M2 += delta * delta2
def finalize(existingAggregate):
(count, mean, M2) = existingAggregate
(mean, variance, sampleVariance) = (mean, M2/count, M2/(count - 1))
if count < 2:
return float('nan')
else:
return (mean, variance, sampleVariance)
不过,我一定没有理解正确,因为结果是错误的:
# example x that might have led to an a = (2, 2.0, 2.0).
x = [1.0, 3.0]
mean = np.mean(x)
count = len(x)
m2 = np.sum(np.subtract(x, [mean] * count)**2)
a = (count, mean, m2)
print(a)
# new batch of values.
b = [5, 3]
注意 a = (2, 2.0, 2.0) 表示我们有 2 个观测值,它们的平均值是 2.0。
# update one at a time.
temp = update(a, newValues[0])
result_single = update(temp, newValues[1])
print(finalize(result_single))
# update with my faulty batch function.
result_batch = update1(a, newValues)
print(finalize(result_batch))
正确的输出应该是两次应用单个数字更新的结果:
(3.0, 2.0, 2.6666666666666665)
(3.0, 2.5, 3.3333333333333335)
关于正确的方差更新,我遗漏了什么?我是否也需要以某种方式更新 finalize 函数?
我需要这样做的原因是因为我正在处理非常大的月度文件(具有不同数量的观察值)并且我需要获得年度均值和方差。
我不太熟悉Python,所以我宁愿坚持数学符号。
要更新均值,您必须执行以下操作:
s = sum of new values
c = number of new values
newMean = oldMean + sum_i (newValue[i] - oldMean) / newCount
对于M2
,您必须添加另一个求和:
newM2 = oldM2 + sum_i ((newValue[i] - newMean) * (newValue[i] - oldMean))
我不确定你是否真的用这个批量更新保存了任何东西,因为你里面还有一个循环。
感谢 Nico 的澄清,我明白了!
问题是我对增量求和然后乘以得到 M2,但必须对增量的乘积求和。
这是能够接受单个数字和批次的正确批处理函数:
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
def update(existingAggregate, newValues):
if isinstance(newValues, (int, float, complex)):
# Handle single digits.
newValues = [newValues]
(count, mean, M2) = existingAggregate
count += len(newValues)
# newvalues - oldMean
delta = np.subtract(newValues, [mean] * len(newValues))
mean += np.sum(delta / count)
# newvalues - newMeant
delta2 = np.subtract(newValues, [mean] * len(newValues))
M2 += np.sum(delta * delta2)
return (count, mean, M2)
def finalize(existingAggregate):
(count, mean, M2) = existingAggregate
(mean, variance, sampleVariance) = (mean, M2/count, M2/(count - 1))
if count < 2:
return float('nan')
else:
return (mean, variance, sampleVariance)
示例用法:
x = [1.0, 3.0]
mean = np.mean(x)
count = len(x)
m2 = np.sum(np.subtract(x, [mean] * count)**2)
a = (count, mean, m2)
print(a)
# new batch of values.
b = [5, 3]
result_batch = update(a, b)
result_batch1 = update(a, b[0])
print(finalize(result_batch))
print(finalize(result_batch1))
而且确实更快:
import timeit
x = random.sample(range(1, 10000), 1000)
# ...
b = random.sample(range(1, 10000), 1000)
start_time = timeit.default_timer()
result_batch = update(a, b)
print(f'{timeit.default_timer() - start_time:.4f}')
print(*(f'{x:.2f}' for x in finalize(result_batch)))
start_time = timeit.default_timer()
for i in b:
a = update1(a, i)
print(f'{timeit.default_timer() - start_time:.4f}')
print(*(f'{x:.2f}' for x in finalize(result_batch)))
结果:
0.0010
5008.36 8423224.68 8427438.40
0.0031
5008.36 8423224.68 8427438.40
加上前面的答案 derivation for the generalized case (calculate s_{n+k} given s_n) follows similarly to the proof for the k=1 version,得到:
其中 u_{n+k} 也是 calculated batch-wise 通过:
您还可以考虑一种基于计算 mean/variance of c = A union B from the mean/variance of A and B, that is easily parallelizable (Chan's algorithm) 的方法。
我想扩展 Welford 的在线算法,以便能够更新多个数字(批量),而不是一次只更新一个:https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
我尝试从 wiki 页面更新算法,如下所示:
# my attempt.
def update1(existingAggregate, newValues):
(count, mean, M2) = existingAggregate
count += len(newValues)
delta = np.sum(np.subtract(newValues, [mean] * len(newValues)))
mean += delta / count
delta2 = np.sum(np.subtract(newValues, [mean] * len(newValues)))
M2 += delta * delta2
return (count, mean, M2)
# The original two functions from wikipedia.
def update(existingAggregate, newValue):
(count, mean, M2) = existingAggregate
count += 1
delta = newValue - mean
mean += delta / count
delta2 = newValue - mean
M2 += delta * delta2
def finalize(existingAggregate):
(count, mean, M2) = existingAggregate
(mean, variance, sampleVariance) = (mean, M2/count, M2/(count - 1))
if count < 2:
return float('nan')
else:
return (mean, variance, sampleVariance)
不过,我一定没有理解正确,因为结果是错误的:
# example x that might have led to an a = (2, 2.0, 2.0).
x = [1.0, 3.0]
mean = np.mean(x)
count = len(x)
m2 = np.sum(np.subtract(x, [mean] * count)**2)
a = (count, mean, m2)
print(a)
# new batch of values.
b = [5, 3]
注意 a = (2, 2.0, 2.0) 表示我们有 2 个观测值,它们的平均值是 2.0。
# update one at a time.
temp = update(a, newValues[0])
result_single = update(temp, newValues[1])
print(finalize(result_single))
# update with my faulty batch function.
result_batch = update1(a, newValues)
print(finalize(result_batch))
正确的输出应该是两次应用单个数字更新的结果:
(3.0, 2.0, 2.6666666666666665)
(3.0, 2.5, 3.3333333333333335)
关于正确的方差更新,我遗漏了什么?我是否也需要以某种方式更新 finalize 函数?
我需要这样做的原因是因为我正在处理非常大的月度文件(具有不同数量的观察值)并且我需要获得年度均值和方差。
我不太熟悉Python,所以我宁愿坚持数学符号。
要更新均值,您必须执行以下操作:
s = sum of new values
c = number of new values
newMean = oldMean + sum_i (newValue[i] - oldMean) / newCount
对于M2
,您必须添加另一个求和:
newM2 = oldM2 + sum_i ((newValue[i] - newMean) * (newValue[i] - oldMean))
我不确定你是否真的用这个批量更新保存了任何东西,因为你里面还有一个循环。
感谢 Nico 的澄清,我明白了! 问题是我对增量求和然后乘以得到 M2,但必须对增量的乘积求和。 这是能够接受单个数字和批次的正确批处理函数:
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
def update(existingAggregate, newValues):
if isinstance(newValues, (int, float, complex)):
# Handle single digits.
newValues = [newValues]
(count, mean, M2) = existingAggregate
count += len(newValues)
# newvalues - oldMean
delta = np.subtract(newValues, [mean] * len(newValues))
mean += np.sum(delta / count)
# newvalues - newMeant
delta2 = np.subtract(newValues, [mean] * len(newValues))
M2 += np.sum(delta * delta2)
return (count, mean, M2)
def finalize(existingAggregate):
(count, mean, M2) = existingAggregate
(mean, variance, sampleVariance) = (mean, M2/count, M2/(count - 1))
if count < 2:
return float('nan')
else:
return (mean, variance, sampleVariance)
示例用法:
x = [1.0, 3.0]
mean = np.mean(x)
count = len(x)
m2 = np.sum(np.subtract(x, [mean] * count)**2)
a = (count, mean, m2)
print(a)
# new batch of values.
b = [5, 3]
result_batch = update(a, b)
result_batch1 = update(a, b[0])
print(finalize(result_batch))
print(finalize(result_batch1))
而且确实更快:
import timeit
x = random.sample(range(1, 10000), 1000)
# ...
b = random.sample(range(1, 10000), 1000)
start_time = timeit.default_timer()
result_batch = update(a, b)
print(f'{timeit.default_timer() - start_time:.4f}')
print(*(f'{x:.2f}' for x in finalize(result_batch)))
start_time = timeit.default_timer()
for i in b:
a = update1(a, i)
print(f'{timeit.default_timer() - start_time:.4f}')
print(*(f'{x:.2f}' for x in finalize(result_batch)))
结果:
0.0010
5008.36 8423224.68 8427438.40
0.0031
5008.36 8423224.68 8427438.40
加上前面的答案 derivation for the generalized case (calculate s_{n+k} given s_n) follows similarly to the proof for the k=1 version,得到:
其中 u_{n+k} 也是 calculated batch-wise 通过:
您还可以考虑一种基于计算 mean/variance of c = A union B from the mean/variance of A and B, that is easily parallelizable (Chan's algorithm) 的方法。