Python 中向量的高性能全对全比较
High performance all-to-all comparison of vectors in Python
首先介绍一下背景:聚类之间的几种比较方法依赖于所谓的对计数。我们在相同的 n
实体上有两个平面聚类向量 a
和 b
。在对所有可能的实体对进行对计数时,我们检查它们是否在两者中都属于同一集群,或者在 a
中属于相同但在 b
中不同,或者相反,或者在两个都。这样我们得到 4 个计数,我们称它们为 n11, n10, n01, n00
。这些是不同指标的输入。
当实体数量在 10,000 左右,要比较的聚类数量为数十个或更多时,性能成为问题,因为每次比较的对数为 10^8
,并且对于聚类的全部比较这需要执行 10^4
次。
用一个天真的 Python 实现真的花了很长时间,所以我转向了 Cython 和 numpy。这样我就可以将一次比较的时间缩短到 0.9-3.0
秒左右。就我而言,这仍然意味着半天的运行时间。我想知道您是否看到了通过一些聪明的算法或 C 库或其他任何东西来实现性能成就的可能性。
这是我的尝试:
1) 这在没有为所有对分配巨大数组的情况下进行计数,采用长度为 n
的 2 个成员向量 a1, a2
和 returns 计数:
cimport cython
import numpy as np
cimport numpy as np
ctypedef np.int32_t DTYPE_t
@cython.boundscheck(False)
def pair_counts(
np.ndarray[DTYPE_t, ndim = 1] a1,
np.ndarray[DTYPE_t, ndim = 1] a2,
):
cdef unsigned int a1s = a1.shape[0]
cdef unsigned int a2s = a2.shape[0]
cdef unsigned int n11, n10, n01, n00
n11 = n10 = n01 = n00 = 0
cdef unsigned int j0
for i in range(0, a1s - 1):
j0 = i + 1
for j in range(j0, a2s):
if a1[i] == a1[j] and a2[i] == a2[j]:
n11 += 1
elif a1[i] == a1[j]:
n10 += 1
elif a2[i] == a2[j]:
n01 += 1
else:
n00 += 1
return n11, n10, n01, n00
2) 这首先为 2 个聚类中的每一个计算成员向量(长度 n * (n-1) / 2
,每个实体对一个元素),然后计算这些向量的计数。它在每次比较时分配 ~20-40M 内存,但有趣的是,比之前更快。注意:c
是围绕集群的包装器 class,具有通常的成员向量,还有一个 c.members
dict,它包含 numpy 数组中每个集群的实体索引。
cimport cython
import numpy as np
cimport numpy as np
@cython.boundscheck(False)
def comembership(c):
"""
Returns comembership vector, where each value tells if one pair
of entites belong to the same group (1) or not (0).
"""
cdef int n = len(c.memberships)
cdef int cnum = c.cnum
cdef int ri, i, ii, j, li
cdef unsigned char[:] cmem = \
np.zeros((int(n * (n - 1) / 2), ), dtype = np.uint8)
for ci in xrange(cnum):
# here we use the members dict to have the indices of entities
# in cluster (ci), as a numpy array (mi)
mi = c.members[ci]
for i in xrange(len(mi) - 1):
ii = mi[i]
# this is only to convert the indices of an n x n matrix
# to the indices of a 1 x (n x (n-1) / 2) vector:
ri = n * ii - 3 * ii / 2 - ii ** 2 / 2 - 1
for j in mi[i+1:]:
# also here, adding j only for having the correct index
li = ri + j
cmem[li] = 1
return np.array(cmem)
def pair_counts(c1, c2):
p1 = comembership(c1)
p2 = comembership(c2)
n = len(c1.memberships)
a11 = p1 * p2
n11 = a11.sum()
n10 = (p1 - a11).sum()
n01 = (p2 - a11).sum()
n00 = n - n10 - n01 - n11
return n11, n10, n01, n00
3) 这是一个纯基于 numpy 的解决方案,它创建了一个 n x n
实体对成员身份的布尔数组。输入是成员向量 (a1, a2
).
def pair_counts(a1, a2):
n = len(a1)
cmem1 = a1.reshape([n,1]) == a1.reshape([1,n])
cmem2 = a2.reshape([n,1]) == a2.reshape([1,n])
n11 = int(((cmem1 == cmem2).sum() - n) / 2)
n10 = int((cmem1.sum() - n) / 2) - n11
n01 = int((cmem2.sum() - n) / 2) - n11
n00 = n - n11 - n10 - n01
return n11, n10, n01, n00
编辑: 示例数据
import numpy as np
a1 = np.random.randint(0, 1868, 14440, dtype = np.int32)
a2 = np.random.randint(0, 484, 14440, dtype = np.int32)
# to have the members dicts used in example 2:
def get_cnum(a):
"""
Returns number of clusters.
"""
return len(np.unique(a))
def get_members(a):
"""
Returns a dict with cluster numbers as keys and member entities
as sorted numpy arrays.
"""
members = dict(map(lambda i: (i, []), range(max(a) + 1)))
list(map(lambda m: members[m[1]].append(m[0]),
enumerate(a)))
members = dict(map(lambda m:
(m[0], np.array(sorted(m[1]), dtype = np.int)),
members.items()))
return members
members1 = get_members(a1)
members2 = get_members(a2)
cnum1 = get_cnum(a1)
cnum2 = get_cnum(a2)
基于排序的方法有其优点,但可以做得更简单:
def pair_counts(a, b):
n = a.shape[0] # also b.shape[0]
counts_a = np.bincount(a)
counts_b = np.bincount(b)
sorter_a = np.argsort(a)
n11 = 0
same_a_offset = np.cumsum(counts_a)
for indices in np.split(sorter_a, same_a_offset):
b_check = b[indices]
n11 += np.count_nonzero(b_check == b_check[:,None])
n11 = (n11-n) // 2
n10 = (np.sum(counts_a**2) - n) // 2 - n11
n01 = (np.sum(counts_b**2) - n) // 2 - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
如果在 Cython 中有效地编码此方法,则可以获得另一个加速(可能 ~20x)。
编辑:
我找到了一种完全向量化过程并降低时间复杂度的方法:
def sizes2count(a, n):
return (np.inner(a, a) - n) // 2
def pair_counts_vec_nlogn(a, b):
# Size of "11" clusters (a[i]==a[j] & b[i]==b[j])
ab = a * b.max() + b # make sure max(a)*max(b) fits the dtype!
_, sizes = np.unique(ab, return_counts=True)
# Calculate the counts for each type of pairing
n = len(a) # also len(b)
n11 = sizes2count(sizes, n)
n10 = sizes2count(np.bincount(a), n) - n11
n01 = sizes2count(np.bincount(b), n) - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
def pair_counts_vec_linear(a, b):
# Label "11" clusters (a[i]==a[j] & b[i]==b[j])
ab = a * b.max() + b
# Calculate the counts for each type of pairing
n = len(a) # also len(b)
n11 = sizes2count(np.bincount(ab), n)
n10 = sizes2count(np.bincount(a), n) - n11
n01 = sizes2count(np.bincount(b), n) - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
有时 O(n log(n)) 算法比线性算法快,因为线性算法使用 max(a)*max(b)
存储。命名可能会改进,我不太熟悉术语。
要在线性时间内比较两个聚类 A
和 B
:
- 遍历
A
中的集群。设每个簇的大小为a_i
。 A
中同一簇的总对数是所有a_i*(a_i-1)/2
. 的总和
- 根据
B
中的簇对每个 A 簇进行分区。设每个分区的大小为b_j
。 A
和 B
中同一簇中的对总数是所有 b_j *(b_j-1)/2
. 的总数
- 两者之间的差异是在 A 而不是 B 中属于同一簇的对的总数
- 遍历
B
中的custers,得到B
中同一个簇中的总对数,用(2)的结果减去B
中的同一个簇中的对总数=11=] 但不是 A
.
- 以上3个结果的总和就是A或B中相同的对数。用n*(n-1)/2相减得到不同簇中的总对数在 A 和 B
步骤(2)中的分区是通过为B创建一个字典映射item -> cluster,然后在每个A-cluster中查找每个item来完成的。如果您要交叉比较大量聚类,只需为每个聚类计算一次这些映射并将它们保存在周围,就可以节省大量时间。
您不需要枚举和计数对。
相反,计算一个混淆矩阵,其中包含第一个聚类的每个聚类与第二个聚类的每个聚类的交集大小(这是对所有对象的一个循环),然后使用等式 n*(n-1)/2
.
计算此矩阵中的对数
这会将您的运行时间从 O(n^2) 减少到 O(n),因此它应该会给您带来 可观的 加速。
首先介绍一下背景:聚类之间的几种比较方法依赖于所谓的对计数。我们在相同的 n
实体上有两个平面聚类向量 a
和 b
。在对所有可能的实体对进行对计数时,我们检查它们是否在两者中都属于同一集群,或者在 a
中属于相同但在 b
中不同,或者相反,或者在两个都。这样我们得到 4 个计数,我们称它们为 n11, n10, n01, n00
。这些是不同指标的输入。
当实体数量在 10,000 左右,要比较的聚类数量为数十个或更多时,性能成为问题,因为每次比较的对数为 10^8
,并且对于聚类的全部比较这需要执行 10^4
次。
用一个天真的 Python 实现真的花了很长时间,所以我转向了 Cython 和 numpy。这样我就可以将一次比较的时间缩短到 0.9-3.0
秒左右。就我而言,这仍然意味着半天的运行时间。我想知道您是否看到了通过一些聪明的算法或 C 库或其他任何东西来实现性能成就的可能性。
这是我的尝试:
1) 这在没有为所有对分配巨大数组的情况下进行计数,采用长度为 n
的 2 个成员向量 a1, a2
和 returns 计数:
cimport cython
import numpy as np
cimport numpy as np
ctypedef np.int32_t DTYPE_t
@cython.boundscheck(False)
def pair_counts(
np.ndarray[DTYPE_t, ndim = 1] a1,
np.ndarray[DTYPE_t, ndim = 1] a2,
):
cdef unsigned int a1s = a1.shape[0]
cdef unsigned int a2s = a2.shape[0]
cdef unsigned int n11, n10, n01, n00
n11 = n10 = n01 = n00 = 0
cdef unsigned int j0
for i in range(0, a1s - 1):
j0 = i + 1
for j in range(j0, a2s):
if a1[i] == a1[j] and a2[i] == a2[j]:
n11 += 1
elif a1[i] == a1[j]:
n10 += 1
elif a2[i] == a2[j]:
n01 += 1
else:
n00 += 1
return n11, n10, n01, n00
2) 这首先为 2 个聚类中的每一个计算成员向量(长度 n * (n-1) / 2
,每个实体对一个元素),然后计算这些向量的计数。它在每次比较时分配 ~20-40M 内存,但有趣的是,比之前更快。注意:c
是围绕集群的包装器 class,具有通常的成员向量,还有一个 c.members
dict,它包含 numpy 数组中每个集群的实体索引。
cimport cython
import numpy as np
cimport numpy as np
@cython.boundscheck(False)
def comembership(c):
"""
Returns comembership vector, where each value tells if one pair
of entites belong to the same group (1) or not (0).
"""
cdef int n = len(c.memberships)
cdef int cnum = c.cnum
cdef int ri, i, ii, j, li
cdef unsigned char[:] cmem = \
np.zeros((int(n * (n - 1) / 2), ), dtype = np.uint8)
for ci in xrange(cnum):
# here we use the members dict to have the indices of entities
# in cluster (ci), as a numpy array (mi)
mi = c.members[ci]
for i in xrange(len(mi) - 1):
ii = mi[i]
# this is only to convert the indices of an n x n matrix
# to the indices of a 1 x (n x (n-1) / 2) vector:
ri = n * ii - 3 * ii / 2 - ii ** 2 / 2 - 1
for j in mi[i+1:]:
# also here, adding j only for having the correct index
li = ri + j
cmem[li] = 1
return np.array(cmem)
def pair_counts(c1, c2):
p1 = comembership(c1)
p2 = comembership(c2)
n = len(c1.memberships)
a11 = p1 * p2
n11 = a11.sum()
n10 = (p1 - a11).sum()
n01 = (p2 - a11).sum()
n00 = n - n10 - n01 - n11
return n11, n10, n01, n00
3) 这是一个纯基于 numpy 的解决方案,它创建了一个 n x n
实体对成员身份的布尔数组。输入是成员向量 (a1, a2
).
def pair_counts(a1, a2):
n = len(a1)
cmem1 = a1.reshape([n,1]) == a1.reshape([1,n])
cmem2 = a2.reshape([n,1]) == a2.reshape([1,n])
n11 = int(((cmem1 == cmem2).sum() - n) / 2)
n10 = int((cmem1.sum() - n) / 2) - n11
n01 = int((cmem2.sum() - n) / 2) - n11
n00 = n - n11 - n10 - n01
return n11, n10, n01, n00
编辑: 示例数据
import numpy as np
a1 = np.random.randint(0, 1868, 14440, dtype = np.int32)
a2 = np.random.randint(0, 484, 14440, dtype = np.int32)
# to have the members dicts used in example 2:
def get_cnum(a):
"""
Returns number of clusters.
"""
return len(np.unique(a))
def get_members(a):
"""
Returns a dict with cluster numbers as keys and member entities
as sorted numpy arrays.
"""
members = dict(map(lambda i: (i, []), range(max(a) + 1)))
list(map(lambda m: members[m[1]].append(m[0]),
enumerate(a)))
members = dict(map(lambda m:
(m[0], np.array(sorted(m[1]), dtype = np.int)),
members.items()))
return members
members1 = get_members(a1)
members2 = get_members(a2)
cnum1 = get_cnum(a1)
cnum2 = get_cnum(a2)
基于排序的方法有其优点,但可以做得更简单:
def pair_counts(a, b):
n = a.shape[0] # also b.shape[0]
counts_a = np.bincount(a)
counts_b = np.bincount(b)
sorter_a = np.argsort(a)
n11 = 0
same_a_offset = np.cumsum(counts_a)
for indices in np.split(sorter_a, same_a_offset):
b_check = b[indices]
n11 += np.count_nonzero(b_check == b_check[:,None])
n11 = (n11-n) // 2
n10 = (np.sum(counts_a**2) - n) // 2 - n11
n01 = (np.sum(counts_b**2) - n) // 2 - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
如果在 Cython 中有效地编码此方法,则可以获得另一个加速(可能 ~20x)。
编辑:
我找到了一种完全向量化过程并降低时间复杂度的方法:
def sizes2count(a, n):
return (np.inner(a, a) - n) // 2
def pair_counts_vec_nlogn(a, b):
# Size of "11" clusters (a[i]==a[j] & b[i]==b[j])
ab = a * b.max() + b # make sure max(a)*max(b) fits the dtype!
_, sizes = np.unique(ab, return_counts=True)
# Calculate the counts for each type of pairing
n = len(a) # also len(b)
n11 = sizes2count(sizes, n)
n10 = sizes2count(np.bincount(a), n) - n11
n01 = sizes2count(np.bincount(b), n) - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
def pair_counts_vec_linear(a, b):
# Label "11" clusters (a[i]==a[j] & b[i]==b[j])
ab = a * b.max() + b
# Calculate the counts for each type of pairing
n = len(a) # also len(b)
n11 = sizes2count(np.bincount(ab), n)
n10 = sizes2count(np.bincount(a), n) - n11
n01 = sizes2count(np.bincount(b), n) - n11
n00 = n**2 - n - n11 - n10 - n01
return n11, n10, n01, n00
有时 O(n log(n)) 算法比线性算法快,因为线性算法使用 max(a)*max(b)
存储。命名可能会改进,我不太熟悉术语。
要在线性时间内比较两个聚类 A
和 B
:
- 遍历
A
中的集群。设每个簇的大小为a_i
。A
中同一簇的总对数是所有a_i*(a_i-1)/2
. 的总和
- 根据
B
中的簇对每个 A 簇进行分区。设每个分区的大小为b_j
。A
和B
中同一簇中的对总数是所有b_j *(b_j-1)/2
. 的总数
- 两者之间的差异是在 A 而不是 B 中属于同一簇的对的总数
- 遍历
B
中的custers,得到B
中同一个簇中的总对数,用(2)的结果减去B
中的同一个簇中的对总数=11=] 但不是A
. - 以上3个结果的总和就是A或B中相同的对数。用n*(n-1)/2相减得到不同簇中的总对数在 A 和 B
步骤(2)中的分区是通过为B创建一个字典映射item -> cluster,然后在每个A-cluster中查找每个item来完成的。如果您要交叉比较大量聚类,只需为每个聚类计算一次这些映射并将它们保存在周围,就可以节省大量时间。
您不需要枚举和计数对。
相反,计算一个混淆矩阵,其中包含第一个聚类的每个聚类与第二个聚类的每个聚类的交集大小(这是对所有对象的一个循环),然后使用等式 n*(n-1)/2
.
这会将您的运行时间从 O(n^2) 减少到 O(n),因此它应该会给您带来 可观的 加速。