for嵌套循环的多重处理

Multiprocessing of for nested loops

我正在尝试执行以下代码,以在最后保存列表 data_set_N。

import re
data_set_N=[]    
for i_1 in list(range(10))+list("+-"):
    for i_2 in list(range(10))+list("+-*/")+["=="]:
        for i_3 in list(range(10))+list("+-*/")+["=="]:
            for i_4 in list(range(10))+list("+-*/")+["=="]:
                for i_5 in list(range(10))+list("+-*/")+["=="]:
                    for i_6 in list(range(10))+list("+-*/")+["=="]:
                        for i_7 in list(range(10))+list("+-*/")+["=="]:
                            for i_8 in list(range(10)):
                                try :
                                    value= str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
                                    valuev=re.sub(r'\b0+(?!\b)', '', value)
                                    evaluation = eval(valuev)
                                    if type(evaluation) == type(True) and evaluation and "//" not in value:
                                        data_set_N.append(value)

                                except:
                                    continue
print(len(data_set_N))

问题是需要 50 多个小时,第一个 i_1 需要 4.5 小时。 为了让 data_set_N 更快,我想使用多处理。我的想法是使用这样的东西:

from multiprocessing import Process, Manager
import itertools    
import re

def add_value(data_set_N,paramlist):
    #I am not sure if this function is well defined
    try 
        i_1,i_2,i_3,i_4 = paramlist[0],paramlist[1],paramlist[2],paramlist[3]
        i_5,i_6,i_7,i_8 = paramlist[4],paramlist[5],paramlist[6],paramlist[7]
        value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
        valuev =re.sub(r'\b0+(?!\b)', '', value)
        evaluation = eval(valuev)
        if type(evaluation) == type(True) and evaluation and "//" not in value:
            data_set_N.append(value)

    except:
        return     
        
        
data_set_N = [] 
#Generate values for each parameter
I_1 = list(range(10))+list("+-")
I_2 = list(range(10))+list("+-*/")+["=="]
I_3 = list(range(10))+list("+-*/")+["=="]
I_4 = list(range(10))+list("+-*/")+["=="]
I_5 = list(range(10))+list("+-*/")+["=="]
I_6 = list(range(10))+list("+-*/")+["=="]
I_7 = list(range(10))+list("+-*/")+["=="]
I_8 = list(range(10))
paramlist = list(itertools.product(I_1,I_2,I_3,I_4,I_5,I_6,I_7,I_8))

if __name__ == "__main__":
    with Manager() as manager:
        data_set_N = manager.list()  # <-- can be shared between processes.
        processes = []

        for i in range(10): #os.cpu_count() - 2 =10 , this range can be changed
            p = Process(target=add_value, args=(data_set_N,paramlist))  # Passing the list
            p.start()
            processes.append(p)

        for p in processes:
            pool.close()
            pool.join()

        data_set_N = list(data_set_N) #the final list

这里的问题是 paramlist 导致 MemoryError(因为它的大小是 12x15^6x10)。

有没有办法使用多处理来更快地执行代码(大约 10 小时),同时避免内存问题?

我会做的是:

  1. 使用多处理池。
  2. 保持基本代码不变,除了 7 个嵌套循环(七个内部循环)并且每个提交的任务都在处理 i_1 列表中的一个值。
  3. 在评估 value 后立即移动便宜的 '//' in value 检查,并可能避免进行不必要的正则表达式替换和调用 eval 函数。

这将使用最多 12 个内核(i_1 列表的长度)如果您的计算机有它们。如果你的计算机有 12 个内核 and 它们都是物理内核 and 没有别的是 运行,我希望执行时间是减少了 12 倍。如果你有 6 个物理核心和 6 个逻辑核心,而其他进程是 运行,那么执行时间显然不会减少 12 倍(我无法预测多少)。但这是解决记忆问题的最简单方法。

如果你有比 12 个多得多的核心,那么你可以定义 process_value 以拥有 6 个最内层循环并使用方法 starmapiterable 的每个元素将是元组和 process_value 现在除了托管列表之外还有两个参数,i_1i_2),iterable 参数是i_1 列表和 i_2 列表。

from multiprocessing import Pool, Manager, cpu_count
from functools import partial
import re


def process_value(data_set_N, i_1):
    for i_2 in list(range(10))+list("+-*/")+["=="]:
        for i_3 in list(range(10))+list("+-*/")+["=="]:
            for i_4 in list(range(10))+list("+-*/")+["=="]:
                for i_5 in list(range(10))+list("+-*/")+["=="]:
                    for i_6 in list(range(10))+list("+-*/")+["=="]:
                        for i_7 in list(range(10))+list("+-*/")+["=="]:
                            for i_8 in list(range(10)):
                                try:
                                    value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
                                    if '//' in value:
                                        continue
                                    valuev = re.sub(r'\b0+(?!\b)', '', value)
                                    evaluation = eval(valuev)
                                    if type(evaluation) == type(True) and evaluation:
                                        data_set_N.append(value)
                                except:
                                    continue

if __name__ == '__main__':
    with Manager() as manager:
        data_set_N = manager.list()
        # The iterable is the i_1 list:
        i_1_list = list(range(10))+list("+-")
        POOL_SIZE = min(cpu_count(), len(i_1_list))
        pool = Pool(POOL_SIZE)
        pool.map(partial(process_value, data_set_N), i_1_list)
        pool.close()
        pool.join()
        print(len(data_set_N))