无法使用 pool.apply_async 通过多处理聚合结果
Unable to use pool.apply_async to aggregate results with multiprocessing
假设我有以下功能:
def fetch_num():
x = np.random.randint(low=0, high=1000000) # choose a number
for i in range(5000000): # do some calculations
j = i ** 2
return x # return a result
此函数选择一个随机数,然后进行一些计算,然后 returns 它。
我想创建一个包含所有这些结果的大列表。问题是,我不想处理同一个数字两次,我想使用 multiprocessing
来加快速度。
我试过以下代码:
import multiprocessing as mp
from tqdm import tqdm
from parallelizing_defs import fetch_num
import os
os.system("taskset -p 0xff %d" % os.getpid())
if __name__ == '__main__':
n = 10 # number of numbers that I want to gather
def collect_result(result): # a callback function - only append if it is not in the results list
if result not in results:
results.append(result)
pbar.update(1) # this is just for the fancy progress bar
def error_callback(e):
raise e
pool = mp.Pool(6) # create 6 workers
global results # initialize results list
results = []
pbar = tqdm(total=n) # initialize a progress bar
while len(results) < n: # work until enough results have been accumulated
pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)
pool.close()
pool.join()
备注:
- 函数
fetch_num
是从另一个 python 文件导入的,因为我知道它在这个问题 的同一个文件中不起作用
- 怪异的
os
行,看完本期后补充:Why does multiprocessing use only a single core after I import numpy?
我的问题是:
- 循环不会停止,它会一直持续下去。
- 迭代速度不快,好像没用多核。
我尝试了很多其他配置,但似乎都不起作用。这听起来像是一种非常普遍的情况,但我一直无法找到该特定问题的示例。
任何关于为什么会发生这些行为的想法都将不胜感激。
您应该添加一个 error_callback
来显示来自子进程的错误,并减少预期结果(这样您就不会永远循环)或将错误向上推送以使脚本崩溃。
您有几个问题。首先,您需要包含 numpy
。但是你的大问题是:
while len(results) < n: # work until enough results have been accumulated
pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)
您可以通过调用 apply_async
来提交这些作业,这比返回结果的速度更快,最终会提交太多作业。您需要准确提交 n
个作业,并注意确保不会在 fetch_num
中返回重复的结果。这样做的方法是使用一个可共享的集合来保存所有以前生成的数字。不幸的是,不存在可共享集。但我们确实有可用于此目的的共享词典。因此,我们使用一个指向可共享字典的代理指针和一个锁来初始化池中的每个进程,以序列化对字典的访问。
进程池函数 sucg 作为 fetch_num
确实必须导入,但 仅当您 运行 在类似 jupyter notebook
[=25= 的情况下].如果您是 运行 从命令行“正常”运行的程序,则不需要这样做。因此,我包含了内嵌的源代码,这样您可能会看到它。我还添加了一个打印语句,以便您可以看到所有 6 个进程都是 运行 并行的。
import multiprocessing as mp
import numpy as np
from tqdm import tqdm
def pool_init(the_dict, l):
global num_set, the_lock
num_set = the_dict
the_lock = l
def fetch_num():
the_lock.acquire()
print('fetch_num')
while True:
# get
x = np.random.randint(low=0, high=1000000) # choose a number
if x not in num_set:
num_set[x] = True
break
the_lock.release()
for i in range(5000000): # do some calculations
j = i ** 2
return x # return a result
if __name__ == '__main__':
with mp.Manager() as manager:
the_dict = manager.dict()
the_lock = mp.Lock()
n = 10 # number of numbers that I want to gather
results = []
def collect_result(result):
results.append(result)
pbar.update(1) # this is just for the fancy progress bar
pool = mp.Pool(6, initializer=pool_init, initargs=(the_dict, the_lock)) # create 6 workers
pbar = tqdm(total=n) # initialize a progress bar
for _ in range(n):
pool.apply_async(fetch_num, args=(), callback=collect_result)
pool.close()
pool.join()
print()
print(results)
您是否有特殊原因需要在执行处理的函数中生成数字? Python 和 NumPy 都有无需替换的采样方式,以便将一堆唯一的随机整数提供给您的进程池,而无需担心获取和释放锁。
>>> import numpy as np
>>> from concurrent.futures import ProcessPoolExecutor
>>> rng = np.random.default_rng()
>>> randoms = rng.choice(1000000, size=(10,), replace=False)
>>> randoms
array([908648, 947502, 510774, 272587, 362679, 529124, 42039, 912716,
921618, 581853])
>>> with ProcessPoolExecutor() as p:
... results = p.map(process_num, randoms)
假设我有以下功能:
def fetch_num():
x = np.random.randint(low=0, high=1000000) # choose a number
for i in range(5000000): # do some calculations
j = i ** 2
return x # return a result
此函数选择一个随机数,然后进行一些计算,然后 returns 它。
我想创建一个包含所有这些结果的大列表。问题是,我不想处理同一个数字两次,我想使用 multiprocessing
来加快速度。
我试过以下代码:
import multiprocessing as mp
from tqdm import tqdm
from parallelizing_defs import fetch_num
import os
os.system("taskset -p 0xff %d" % os.getpid())
if __name__ == '__main__':
n = 10 # number of numbers that I want to gather
def collect_result(result): # a callback function - only append if it is not in the results list
if result not in results:
results.append(result)
pbar.update(1) # this is just for the fancy progress bar
def error_callback(e):
raise e
pool = mp.Pool(6) # create 6 workers
global results # initialize results list
results = []
pbar = tqdm(total=n) # initialize a progress bar
while len(results) < n: # work until enough results have been accumulated
pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)
pool.close()
pool.join()
备注:
- 函数
fetch_num
是从另一个 python 文件导入的,因为我知道它在这个问题 的同一个文件中不起作用
- 怪异的
os
行,看完本期后补充:Why does multiprocessing use only a single core after I import numpy?
我的问题是:
- 循环不会停止,它会一直持续下去。
- 迭代速度不快,好像没用多核。
我尝试了很多其他配置,但似乎都不起作用。这听起来像是一种非常普遍的情况,但我一直无法找到该特定问题的示例。 任何关于为什么会发生这些行为的想法都将不胜感激。
您应该添加一个 error_callback
来显示来自子进程的错误,并减少预期结果(这样您就不会永远循环)或将错误向上推送以使脚本崩溃。
您有几个问题。首先,您需要包含 numpy
。但是你的大问题是:
while len(results) < n: # work until enough results have been accumulated
pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)
您可以通过调用 apply_async
来提交这些作业,这比返回结果的速度更快,最终会提交太多作业。您需要准确提交 n
个作业,并注意确保不会在 fetch_num
中返回重复的结果。这样做的方法是使用一个可共享的集合来保存所有以前生成的数字。不幸的是,不存在可共享集。但我们确实有可用于此目的的共享词典。因此,我们使用一个指向可共享字典的代理指针和一个锁来初始化池中的每个进程,以序列化对字典的访问。
进程池函数 sucg 作为 fetch_num
确实必须导入,但 仅当您 运行 在类似 jupyter notebook
[=25= 的情况下].如果您是 运行 从命令行“正常”运行的程序,则不需要这样做。因此,我包含了内嵌的源代码,这样您可能会看到它。我还添加了一个打印语句,以便您可以看到所有 6 个进程都是 运行 并行的。
import multiprocessing as mp
import numpy as np
from tqdm import tqdm
def pool_init(the_dict, l):
global num_set, the_lock
num_set = the_dict
the_lock = l
def fetch_num():
the_lock.acquire()
print('fetch_num')
while True:
# get
x = np.random.randint(low=0, high=1000000) # choose a number
if x not in num_set:
num_set[x] = True
break
the_lock.release()
for i in range(5000000): # do some calculations
j = i ** 2
return x # return a result
if __name__ == '__main__':
with mp.Manager() as manager:
the_dict = manager.dict()
the_lock = mp.Lock()
n = 10 # number of numbers that I want to gather
results = []
def collect_result(result):
results.append(result)
pbar.update(1) # this is just for the fancy progress bar
pool = mp.Pool(6, initializer=pool_init, initargs=(the_dict, the_lock)) # create 6 workers
pbar = tqdm(total=n) # initialize a progress bar
for _ in range(n):
pool.apply_async(fetch_num, args=(), callback=collect_result)
pool.close()
pool.join()
print()
print(results)
您是否有特殊原因需要在执行处理的函数中生成数字? Python 和 NumPy 都有无需替换的采样方式,以便将一堆唯一的随机整数提供给您的进程池,而无需担心获取和释放锁。
>>> import numpy as np
>>> from concurrent.futures import ProcessPoolExecutor
>>> rng = np.random.default_rng()
>>> randoms = rng.choice(1000000, size=(10,), replace=False)
>>> randoms
array([908648, 947502, 510774, 272587, 362679, 529124, 42039, 912716,
921618, 581853])
>>> with ProcessPoolExecutor() as p:
... results = p.map(process_num, randoms)