无法使用 pool.apply_async 通过多处理聚合结果

Unable to use pool.apply_async to aggregate results with multiprocessing

假设我有以下功能:

def fetch_num():
    x = np.random.randint(low=0, high=1000000) # choose a number
    for i in range(5000000): # do some calculations
        j = i ** 2
    return x # return a result

此函数选择一个随机数,然后进行一些计算,然后 returns 它。

我想创建一个包含所有这些结果的大列表。问题是,我不想处理同一个数字两次,我想使用 multiprocessing 来加快速度。

我试过以下代码:

import multiprocessing as mp
from tqdm import tqdm
from parallelizing_defs import fetch_num
import os
os.system("taskset -p 0xff %d" % os.getpid())
if __name__ == '__main__':


    n = 10 # number of numbers that I want to gather

    def collect_result(result): # a callback function - only append if it is not in the results list
        if result not in results:
            results.append(result)
            pbar.update(1) # this is just for the fancy progress bar

    def error_callback(e):
        raise e

    pool = mp.Pool(6) # create 6 workers

    global results # initialize results list
    results = []
    pbar = tqdm(total=n) # initialize a progress bar
    while len(results) < n: # work until enough results have been accumulated
        pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)
    pool.close() 
    pool.join()

备注:

我的问题是:

我尝试了很多其他配置,但似乎都不起作用。这听起来像是一种非常普遍的情况,但我一直无法找到该特定问题的示例。 任何关于为什么会发生这些行为的想法都将不胜感激。

您应该添加一个 error_callback 来显示来自子进程的错误,并减少预期结果(这样您就不会永远循环)或将错误向上推送以使脚本崩溃。

您有几个问题。首先,您需要包含 numpy。但是你的大问题是:

while len(results) < n: # work until enough results have been accumulated
    pool.apply_async(fetch_num, args=(), callback=collect_result, error_callback=error_callback)

您可以通过调用 apply_async 来提交这些作业,这比返回结果的速度更快,最终会提交太多作业。您需要准确提交 n 个作业,并注意确保不会在 fetch_num 中返回重复的结果。这样做的方法是使用一个可共享的集合来保存所有以前生成的数字。不幸的是,不存在可共享集。但我们确实有可用于此目的的共享词典。因此,我们使用一个指向可共享字典的代理指针和一个锁来初始化池中的每个进程,以序列化对字典的访问。

进程池函数 sucg 作为 fetch_num 确实必须导入,但 仅当您 运行 在类似 jupyter notebook[=25= 的情况下].如果您是 运行 从命令行“正常”运行的程序,则不需要这样做。因此,我包含了内嵌的源代码,这样您可能会看到它。我还添加了一个打印语句,以便您可以看到所有 6 个进程都是 运行 并行的。

import multiprocessing as mp
import numpy as np
from tqdm import tqdm


def pool_init(the_dict, l):
    global num_set, the_lock
    num_set = the_dict
    the_lock = l


def fetch_num():
    the_lock.acquire()
    print('fetch_num')
    while True:
        # get
        x = np.random.randint(low=0, high=1000000) # choose a number
        if x not in num_set:
            num_set[x] = True
            break
    the_lock.release()

    for i in range(5000000): # do some calculations
        j = i ** 2
    return x # return a result



if __name__ == '__main__':

    with mp.Manager() as manager:
        the_dict = manager.dict()
        the_lock = mp.Lock()
        n = 10 # number of numbers that I want to gather

        results = []
        def collect_result(result):
            results.append(result)
            pbar.update(1) # this is just for the fancy progress bar

        pool = mp.Pool(6, initializer=pool_init, initargs=(the_dict, the_lock)) # create 6 workers
        pbar = tqdm(total=n) # initialize a progress bar
        for _ in range(n):
            pool.apply_async(fetch_num, args=(), callback=collect_result)
        pool.close()
        pool.join()
        print()
        print(results)

您是否有特殊原因需要在执行处理的函数中生成数字? Python 和 NumPy 都有无需替换的采样方式,以便将一堆唯一的随机整数提供给您的进程池,而无需担心获取和释放锁。

>>> import numpy as np
>>> from concurrent.futures import ProcessPoolExecutor
>>> rng = np.random.default_rng()
>>> randoms = rng.choice(1000000, size=(10,), replace=False)
>>> randoms
array([908648, 947502, 510774, 272587, 362679, 529124,  42039, 912716,
       921618, 581853])
>>> with ProcessPoolExecutor() as p:
...     results = p.map(process_num, randoms)