Python 使用多进程加速合并计数器
Python using multiprocess to speed up merging counters
我正在尝试使用一起购买的物品的次数来制作一个非常简单的物品推荐系统,
所以首先我创建了一个 Counter 的 item2item 字典,比如
# people purchased A with B 4 times, A with C 3 times.
item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}
# recommend user who purchased A and C
samples_list = [['A', 'C'], ...]
因此,对于 samples = ['A', 'C'],我建议最多 item2item['A'] + item2item['C'].
但是,合并对于大矩阵来说很繁重,所以我尝试使用如下的多处理
from operator import add
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
from collections import Counter
with ProcessPoolExecutor(max_workers=10) as pool:
for samples in samples_list:
# w/o PoolExecutor
# combined = reduce(add, [item2item[s] for s in samples], Counter())
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
combined = future.result()
然而,这根本没有加快进程。
我怀疑 reduce 函数中的 Counter 没有像 Python multiprocessing and a shared counter, and https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes 中的回答那样共享。
感谢任何帮助。
调用 combined = future.result()
阻塞,直到结果完成,因此在前一个请求完成之前,您不会向池提交后续请求。换句话说,您永远不会有超过一个子进程 运行。至少您应该将代码更改为:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
results = [f.result() for f in the_futures()] # all the results
另一种方式:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
result = future.result() # do something with this
此外,如果您没有为 ProcessPoolExecutor
构造函数指定 max_workers
,它默认为您机器上的处理器数量。指定一个大于您实际拥有的处理器数量的值不会有任何好处。
更新
如果您想在结果完成后立即对其进行处理,并且需要一种方法将结果与原始请求联系起来,您可以将未来作为键存储在字典中,其中对应的值代表请求'争论。在这种情况下:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = {}
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures[future] = samples # map future to request
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
samples = the_futures[future] # the request
result = future.result() # the result
我正在尝试使用一起购买的物品的次数来制作一个非常简单的物品推荐系统,
所以首先我创建了一个 Counter 的 item2item 字典,比如
# people purchased A with B 4 times, A with C 3 times.
item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}
# recommend user who purchased A and C
samples_list = [['A', 'C'], ...]
因此,对于 samples = ['A', 'C'],我建议最多 item2item['A'] + item2item['C'].
但是,合并对于大矩阵来说很繁重,所以我尝试使用如下的多处理
from operator import add
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
from collections import Counter
with ProcessPoolExecutor(max_workers=10) as pool:
for samples in samples_list:
# w/o PoolExecutor
# combined = reduce(add, [item2item[s] for s in samples], Counter())
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
combined = future.result()
然而,这根本没有加快进程。
我怀疑 reduce 函数中的 Counter 没有像 Python multiprocessing and a shared counter, and https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes 中的回答那样共享。
感谢任何帮助。
调用 combined = future.result()
阻塞,直到结果完成,因此在前一个请求完成之前,您不会向池提交后续请求。换句话说,您永远不会有超过一个子进程 运行。至少您应该将代码更改为:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
results = [f.result() for f in the_futures()] # all the results
另一种方式:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = []
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures.append(future) # save it
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
result = future.result() # do something with this
此外,如果您没有为 ProcessPoolExecutor
构造函数指定 max_workers
,它默认为您机器上的处理器数量。指定一个大于您实际拥有的处理器数量的值不会有任何好处。
更新
如果您想在结果完成后立即对其进行处理,并且需要一种方法将结果与原始请求联系起来,您可以将未来作为键存储在字典中,其中对应的值代表请求'争论。在这种情况下:
with ProcessPoolExecutor(max_workers=10) as pool:
the_futures = {}
for samples in tqdm(sample_list):
future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
the_futures[future] = samples # map future to request
# you need: from concurrent.futures import as_completed
for future in as_completed(the_futures): # not necessarily the order of submission
samples = the_futures[future] # the request
result = future.result() # the result