Python 中的一维 Wasserstein 距离

1D Wasserstein distance in Python

下面的公式是 Wasserstein distance/optimal 传输的一个特例,当源和目标分布 xy(也称为边际分布)为一维时,即, 是向量。

其中 F^{-1} 是边际累积分布 uv 的逆概率分布函数,来自真实数据称为 xy,均从正态分布生成:

import numpy as np
from numpy.random import randn
import scipy.stats as ss

n = 100
x = randn(n)
y = randn(n)

python和scipy公式中的积分如何编码?我猜 x 和 y 必须转换为排名边缘,它们是非负的并且总和为 1,而 Scipy 的 ppf 可用于计算逆 F^{-1}的?

请注意,当 n 变大时,我们有一组排序的 n 样本接近以 1/n 采样的逆 CDF, 2/n, ..., n/n。例如:

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
plt.plot(norm.ppf(np.linspace(0, 1, 1000)), label="invcdf")
plt.plot(np.sort(np.random.normal(size=1000)), label="sortsample")
plt.legend()
plt.show()

另请注意,从 0 到 1 的积分可以近似为 1/n、2/n、... 的和,n/n。

因此我们可以简单地回答你的问题:

def W(p, u, v):
    assert len(u) == len(v)
    return np.mean(np.abs(np.sort(u) - np.sort(v))**p)**(1/p)

请注意,如果 len(u) != len(v) 您仍然可以使用线性插值法:

def W(p, u, v):
    u = np.sort(u)
    v = np.sort(v)
    if len(u) != len(v):
        if len(u) > len(v): u, v = v, u
        us = np.linspace(0, 1, len(u))
        vs = np.linspace(0, 1, len(v))
        u = np.linalg.interp(u, us, vs)
    return np.mean(np.abs(u - v)**p)**(1/p)

如果您有关于数据分布类型的先验信息,但没有关于其参数的信息,另一种方法是为您的数据找到最适合的分布(例如 scipy.stats.norm.fituv 然后以所需的精度进行积分。例如:

from scipy.stats import norm as gauss
def W_gauss(p, u, v, num_steps):
    ud = gauss(*gauss.fit(u))
    vd = gauss(*gauss.fit(v))
    z = np.linspace(0, 1, num_steps, endpoint=False) + 1/(2*num_steps)
    return np.mean(np.abs(ud.ppf(z) - vd.ppf(z))**p)**(1/p)

我想我有点晚了但是,但这是我为精确解决方案所做的(仅使用 numpy):

import numpy as np
from numpy.random import randn
n = 100
m = 80
p = 2
x = np.sort(randn(n))
y = np.sort(randn(m))
a = np.ones(n)/n
b = np.ones(m)/m
# cdfs
ca = np.cumsum(a)
cb = np.cumsum(b)

# points on which we need to evaluate the quantile functions
cba = np.sort(np.hstack([ca, cb]))
# weights for integral
h = np.diff(np.hstack([0, cba]))

# construction of first quantile function
bins = ca + 1e-10 # small tolerance to avoid rounding errors and enforce right continuity
index_qx = np.digitize(cba, bins, right=True)    # right=True becouse quantile function is 
                                                 # right continuous
qx = x[index_qx] # quantile funciton F^{-1}      

# construction of second quantile function 
bins = cb + 1e-10 
index_qy = np.digitize(cba, bins, right=True)    # right=True becouse quantile function is 
                                                 # right continuous
qy = y[index_qy] # quantile funciton G^{-1}

ot_cost = np.sum((qx - qy)**p * h)
print(ot_cost)
        

如果您有兴趣,您可以在此处找到更详细的基于 numpy 的 ot 问题实现,以及对偶和原始解决方案:https://github.com/gnies/1d-optimal-transport。 (虽然我仍在努力)。