在 python 中并行化数组行相似度计算
Parallelizing array row similarity calculations in python
我有一个 large-ish 数组 artist_topic_probs
(112,312 行乘以 ~100 个特征列),我想计算随机行对(大样本)之间的成对余弦相似度从这个数组。这是我当前代码的相关部分
# the number of random pairs to check (10 million here)
random_sample_size=10000000
# I want to make sure they're unique, and that I'm never comparing a row to itself
# so I generate my set of comparisons like so:
np.random.seed(99)
comps = set()
while len(comps)<random_sample_size:
a = np.random.randint(0,112312)
b= np.random.randint(0,112312)
if a!=b:
comp = tuple(sorted([a,b]))
comps.add(comp)
# convert to list at the end to ensure sort order
# not positive if this is needed...I've seen conflicting opinions
comps = list(sorted(comps))
这会生成一个元组列表,其中每个元组都是我将计算相似度的两行。然后我就用一个简单的循环来计算所有的相似度:
c_dists = []
from scipy.spatial.distance import cosine
for a,b in comps:
c_dists.append(cosine(artist_topic_probs[a],artist_topic_probs[b]))
(当然,这里的cosine
给出了距离,而不是相似度,但是我们可以很容易地用sim = 1.0 - dist
。我在标题中使用了相似性,因为它是更常见的术语)
这很好用,但不是太快,我需要多次重复这个过程。我有 32 个核心可以使用,所以并行化似乎是一个不错的选择,但我不确定最好的方法。我的想法是这样的:
pool = mp.Pool(processes=32)
c_dists = [pool.apply(cosine, args=(artist_topic_probs[a],artist_topic_probs[b]))
for a,b in comps]
但是在我的笔记本电脑上用一些测试数据测试这种方法并没有奏效(它只是挂起,或者至少比简单循环花费的时间长得多,以至于我厌倦了等待并终止了它)。我担心矩阵的索引是某种瓶颈,但我不确定。关于如何有效地并行化这个(或以其他方式加快进程)的任何想法?
首先,您可能想在将来使用 itertools.combinations
和 random.sample
来获得唯一对,但由于内存问题,在这种情况下它不起作用。然后,多处理不是多线程,即产生一个新进程涉及巨大的系统开销。为每个单独的任务生成一个进程没有什么意义。一项任务必须非常值得花费开销来合理化启动一个新流程,因此您最好将所有工作分成单独的作业(分成与您要使用的核心数量一样多的部分)。然后,不要忘记 multiprocessing
实现序列化整个命名空间并将其加载到内存中 N 次,其中 N 是进程数。如果您没有足够的 RAM 来存储庞大阵列的 N 个副本,这可能会导致密集交换。因此,您可能希望减少内核数量。
已更新 以按照您的要求恢复初始订单。
我制作了一个相同向量的测试数据集,因此 cosine
必须 return 一个零向量。
from __future__ import division, print_function
import math
import multiprocessing as mp
from scipy.spatial.distance import cosine
from operator import itemgetter
import itertools
def worker(enumerated_comps):
return [(ind, cosine(artist_topic_probs[a], artist_topic_probs[b])) for ind, (a, b) in enumerated_comps]
def slice_iterable(iterable, chunk):
"""
Slices an iterable into chunks of size n
:param chunk: the number of items per slice
:type chunk: int
:type iterable: collections.Iterable
:rtype: collections.Generator
"""
_it = iter(iterable)
return itertools.takewhile(
bool, (tuple(itertools.islice(_it, chunk)) for _ in itertools.count(0))
)
# Test data
artist_topic_probs = [range(10) for _ in xrange(10)]
comps = tuple(enumerate([(1, 2), (1, 3), (1, 4), (1, 5)]))
n_cores = 2
chunksize = int(math.ceil(len(comps)/n_cores))
jobs = tuple(slice_iterable(comps, chunksize))
pool = mp.Pool(processes=n_cores)
work_res = pool.map_async(worker, jobs)
c_dists = map(itemgetter(1), sorted(itertools.chain(*work_res.get())))
print(c_dists)
输出:
[2.2204460492503131e-16, 2.2204460492503131e-16, 2.2204460492503131e-16, 2.2204460492503131e-16]
这些值非常接近于零。
P.S.
来自 multiprocessing.Pool.apply
文档
Equivalent of the apply()
built-in function. It blocks until the
result is ready, so apply_async()
is better suited for performing
work in parallel. Additionally, func is only executed in one of the
workers of the pool.
scipy.spatial.distance.cosine
,正如您在 link 之后看到的那样,在您的计算中引入了显着的开销,因为对于每次调用,它都会计算您在每次调用时分析的两个向量的范数,对于您的样本大小
这相当于计算了 2000 万个范数,如果您提前记住约 10 万个向量的范数,您可以节省大约 60% 的计算时间,因为您有一个点积、u*v 和两个范数计算,并且每个这三个操作在操作数上大致相当。
此外,您正在使用显式循环,如果您可以将逻辑放在矢量化 numpy
运算符中,您可以 trim 另一大部分计算时间。
最后,你谈论余弦相似度...考虑scipy.spatial.distance.cosine
计算余弦距离,关系很简单,cs = cd - 1
但我没有在您发布的代码中看到这一点。
我有一个 large-ish 数组 artist_topic_probs
(112,312 行乘以 ~100 个特征列),我想计算随机行对(大样本)之间的成对余弦相似度从这个数组。这是我当前代码的相关部分
# the number of random pairs to check (10 million here)
random_sample_size=10000000
# I want to make sure they're unique, and that I'm never comparing a row to itself
# so I generate my set of comparisons like so:
np.random.seed(99)
comps = set()
while len(comps)<random_sample_size:
a = np.random.randint(0,112312)
b= np.random.randint(0,112312)
if a!=b:
comp = tuple(sorted([a,b]))
comps.add(comp)
# convert to list at the end to ensure sort order
# not positive if this is needed...I've seen conflicting opinions
comps = list(sorted(comps))
这会生成一个元组列表,其中每个元组都是我将计算相似度的两行。然后我就用一个简单的循环来计算所有的相似度:
c_dists = []
from scipy.spatial.distance import cosine
for a,b in comps:
c_dists.append(cosine(artist_topic_probs[a],artist_topic_probs[b]))
(当然,这里的cosine
给出了距离,而不是相似度,但是我们可以很容易地用sim = 1.0 - dist
。我在标题中使用了相似性,因为它是更常见的术语)
这很好用,但不是太快,我需要多次重复这个过程。我有 32 个核心可以使用,所以并行化似乎是一个不错的选择,但我不确定最好的方法。我的想法是这样的:
pool = mp.Pool(processes=32)
c_dists = [pool.apply(cosine, args=(artist_topic_probs[a],artist_topic_probs[b]))
for a,b in comps]
但是在我的笔记本电脑上用一些测试数据测试这种方法并没有奏效(它只是挂起,或者至少比简单循环花费的时间长得多,以至于我厌倦了等待并终止了它)。我担心矩阵的索引是某种瓶颈,但我不确定。关于如何有效地并行化这个(或以其他方式加快进程)的任何想法?
首先,您可能想在将来使用 itertools.combinations
和 random.sample
来获得唯一对,但由于内存问题,在这种情况下它不起作用。然后,多处理不是多线程,即产生一个新进程涉及巨大的系统开销。为每个单独的任务生成一个进程没有什么意义。一项任务必须非常值得花费开销来合理化启动一个新流程,因此您最好将所有工作分成单独的作业(分成与您要使用的核心数量一样多的部分)。然后,不要忘记 multiprocessing
实现序列化整个命名空间并将其加载到内存中 N 次,其中 N 是进程数。如果您没有足够的 RAM 来存储庞大阵列的 N 个副本,这可能会导致密集交换。因此,您可能希望减少内核数量。
已更新 以按照您的要求恢复初始订单。
我制作了一个相同向量的测试数据集,因此 cosine
必须 return 一个零向量。
from __future__ import division, print_function
import math
import multiprocessing as mp
from scipy.spatial.distance import cosine
from operator import itemgetter
import itertools
def worker(enumerated_comps):
return [(ind, cosine(artist_topic_probs[a], artist_topic_probs[b])) for ind, (a, b) in enumerated_comps]
def slice_iterable(iterable, chunk):
"""
Slices an iterable into chunks of size n
:param chunk: the number of items per slice
:type chunk: int
:type iterable: collections.Iterable
:rtype: collections.Generator
"""
_it = iter(iterable)
return itertools.takewhile(
bool, (tuple(itertools.islice(_it, chunk)) for _ in itertools.count(0))
)
# Test data
artist_topic_probs = [range(10) for _ in xrange(10)]
comps = tuple(enumerate([(1, 2), (1, 3), (1, 4), (1, 5)]))
n_cores = 2
chunksize = int(math.ceil(len(comps)/n_cores))
jobs = tuple(slice_iterable(comps, chunksize))
pool = mp.Pool(processes=n_cores)
work_res = pool.map_async(worker, jobs)
c_dists = map(itemgetter(1), sorted(itertools.chain(*work_res.get())))
print(c_dists)
输出:
[2.2204460492503131e-16, 2.2204460492503131e-16, 2.2204460492503131e-16, 2.2204460492503131e-16]
这些值非常接近于零。
P.S.
来自 multiprocessing.Pool.apply
文档
Equivalent of the
apply()
built-in function. It blocks until the result is ready, soapply_async()
is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.
scipy.spatial.distance.cosine
,正如您在 link 之后看到的那样,在您的计算中引入了显着的开销,因为对于每次调用,它都会计算您在每次调用时分析的两个向量的范数,对于您的样本大小
这相当于计算了 2000 万个范数,如果您提前记住约 10 万个向量的范数,您可以节省大约 60% 的计算时间,因为您有一个点积、u*v 和两个范数计算,并且每个这三个操作在操作数上大致相当。
此外,您正在使用显式循环,如果您可以将逻辑放在矢量化 numpy
运算符中,您可以 trim 另一大部分计算时间。
最后,你谈论余弦相似度...考虑scipy.spatial.distance.cosine
计算余弦距离,关系很简单,cs = cd - 1
但我没有在您发布的代码中看到这一点。