Python 中的一维 Wasserstein 距离
1D Wasserstein distance in Python
下面的公式是 Wasserstein distance/optimal 传输的一个特例,当源和目标分布 x
和 y
(也称为边际分布)为一维时,即, 是向量。
其中 F^{-1} 是边际累积分布 u
和 v
的逆概率分布函数,来自真实数据称为 x
和 y
,均从正态分布生成:
import numpy as np
from numpy.random import randn
import scipy.stats as ss
n = 100
x = randn(n)
y = randn(n)
python和scipy公式中的积分如何编码?我猜 x 和 y 必须转换为排名边缘,它们是非负的并且总和为 1,而 Scipy 的 ppf
可用于计算逆 F^{-1}的?
请注意,当 n 变大时,我们有一组排序的 n 样本接近以 1/n 采样的逆 CDF, 2/n, ..., n/n。例如:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
plt.plot(norm.ppf(np.linspace(0, 1, 1000)), label="invcdf")
plt.plot(np.sort(np.random.normal(size=1000)), label="sortsample")
plt.legend()
plt.show()
另请注意,从 0 到 1 的积分可以近似为 1/n、2/n、... 的和,n/n。
因此我们可以简单地回答你的问题:
def W(p, u, v):
assert len(u) == len(v)
return np.mean(np.abs(np.sort(u) - np.sort(v))**p)**(1/p)
请注意,如果 len(u) != len(v)
您仍然可以使用线性插值法:
def W(p, u, v):
u = np.sort(u)
v = np.sort(v)
if len(u) != len(v):
if len(u) > len(v): u, v = v, u
us = np.linspace(0, 1, len(u))
vs = np.linspace(0, 1, len(v))
u = np.linalg.interp(u, us, vs)
return np.mean(np.abs(u - v)**p)**(1/p)
如果您有关于数据分布类型的先验信息,但没有关于其参数的信息,另一种方法是为您的数据找到最适合的分布(例如 scipy.stats.norm.fit
)u
和 v
然后以所需的精度进行积分。例如:
from scipy.stats import norm as gauss
def W_gauss(p, u, v, num_steps):
ud = gauss(*gauss.fit(u))
vd = gauss(*gauss.fit(v))
z = np.linspace(0, 1, num_steps, endpoint=False) + 1/(2*num_steps)
return np.mean(np.abs(ud.ppf(z) - vd.ppf(z))**p)**(1/p)
我想我有点晚了但是,但这是我为精确解决方案所做的(仅使用 numpy):
import numpy as np
from numpy.random import randn
n = 100
m = 80
p = 2
x = np.sort(randn(n))
y = np.sort(randn(m))
a = np.ones(n)/n
b = np.ones(m)/m
# cdfs
ca = np.cumsum(a)
cb = np.cumsum(b)
# points on which we need to evaluate the quantile functions
cba = np.sort(np.hstack([ca, cb]))
# weights for integral
h = np.diff(np.hstack([0, cba]))
# construction of first quantile function
bins = ca + 1e-10 # small tolerance to avoid rounding errors and enforce right continuity
index_qx = np.digitize(cba, bins, right=True) # right=True becouse quantile function is
# right continuous
qx = x[index_qx] # quantile funciton F^{-1}
# construction of second quantile function
bins = cb + 1e-10
index_qy = np.digitize(cba, bins, right=True) # right=True becouse quantile function is
# right continuous
qy = y[index_qy] # quantile funciton G^{-1}
ot_cost = np.sum((qx - qy)**p * h)
print(ot_cost)
如果您有兴趣,您可以在此处找到更详细的基于 numpy 的 ot 问题实现,以及对偶和原始解决方案:https://github.com/gnies/1d-optimal-transport。 (虽然我仍在努力)。
下面的公式是 Wasserstein distance/optimal 传输的一个特例,当源和目标分布 x
和 y
(也称为边际分布)为一维时,即, 是向量。
其中 F^{-1} 是边际累积分布 u
和 v
的逆概率分布函数,来自真实数据称为 x
和 y
,均从正态分布生成:
import numpy as np
from numpy.random import randn
import scipy.stats as ss
n = 100
x = randn(n)
y = randn(n)
python和scipy公式中的积分如何编码?我猜 x 和 y 必须转换为排名边缘,它们是非负的并且总和为 1,而 Scipy 的 ppf
可用于计算逆 F^{-1}的?
请注意,当 n 变大时,我们有一组排序的 n 样本接近以 1/n 采样的逆 CDF, 2/n, ..., n/n。例如:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
plt.plot(norm.ppf(np.linspace(0, 1, 1000)), label="invcdf")
plt.plot(np.sort(np.random.normal(size=1000)), label="sortsample")
plt.legend()
plt.show()
另请注意,从 0 到 1 的积分可以近似为 1/n、2/n、... 的和,n/n。
因此我们可以简单地回答你的问题:
def W(p, u, v):
assert len(u) == len(v)
return np.mean(np.abs(np.sort(u) - np.sort(v))**p)**(1/p)
请注意,如果 len(u) != len(v)
您仍然可以使用线性插值法:
def W(p, u, v):
u = np.sort(u)
v = np.sort(v)
if len(u) != len(v):
if len(u) > len(v): u, v = v, u
us = np.linspace(0, 1, len(u))
vs = np.linspace(0, 1, len(v))
u = np.linalg.interp(u, us, vs)
return np.mean(np.abs(u - v)**p)**(1/p)
如果您有关于数据分布类型的先验信息,但没有关于其参数的信息,另一种方法是为您的数据找到最适合的分布(例如 scipy.stats.norm.fit
)u
和 v
然后以所需的精度进行积分。例如:
from scipy.stats import norm as gauss
def W_gauss(p, u, v, num_steps):
ud = gauss(*gauss.fit(u))
vd = gauss(*gauss.fit(v))
z = np.linspace(0, 1, num_steps, endpoint=False) + 1/(2*num_steps)
return np.mean(np.abs(ud.ppf(z) - vd.ppf(z))**p)**(1/p)
我想我有点晚了但是,但这是我为精确解决方案所做的(仅使用 numpy):
import numpy as np
from numpy.random import randn
n = 100
m = 80
p = 2
x = np.sort(randn(n))
y = np.sort(randn(m))
a = np.ones(n)/n
b = np.ones(m)/m
# cdfs
ca = np.cumsum(a)
cb = np.cumsum(b)
# points on which we need to evaluate the quantile functions
cba = np.sort(np.hstack([ca, cb]))
# weights for integral
h = np.diff(np.hstack([0, cba]))
# construction of first quantile function
bins = ca + 1e-10 # small tolerance to avoid rounding errors and enforce right continuity
index_qx = np.digitize(cba, bins, right=True) # right=True becouse quantile function is
# right continuous
qx = x[index_qx] # quantile funciton F^{-1}
# construction of second quantile function
bins = cb + 1e-10
index_qy = np.digitize(cba, bins, right=True) # right=True becouse quantile function is
# right continuous
qy = y[index_qy] # quantile funciton G^{-1}
ot_cost = np.sum((qx - qy)**p * h)
print(ot_cost)
如果您有兴趣,您可以在此处找到更详细的基于 numpy 的 ot 问题实现,以及对偶和原始解决方案:https://github.com/gnies/1d-optimal-transport。 (虽然我仍在努力)。