for嵌套循环的多重处理
Multiprocessing of for nested loops
我正在尝试执行以下代码,以在最后保存列表 data_set_N。
import re
data_set_N=[]
for i_1 in list(range(10))+list("+-"):
for i_2 in list(range(10))+list("+-*/")+["=="]:
for i_3 in list(range(10))+list("+-*/")+["=="]:
for i_4 in list(range(10))+list("+-*/")+["=="]:
for i_5 in list(range(10))+list("+-*/")+["=="]:
for i_6 in list(range(10))+list("+-*/")+["=="]:
for i_7 in list(range(10))+list("+-*/")+["=="]:
for i_8 in list(range(10)):
try :
value= str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
valuev=re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation and "//" not in value:
data_set_N.append(value)
except:
continue
print(len(data_set_N))
问题是需要 50 多个小时,第一个 i_1 需要 4.5 小时。
为了让 data_set_N
更快,我想使用多处理。我的想法是使用这样的东西:
from multiprocessing import Process, Manager
import itertools
import re
def add_value(data_set_N,paramlist):
#I am not sure if this function is well defined
try
i_1,i_2,i_3,i_4 = paramlist[0],paramlist[1],paramlist[2],paramlist[3]
i_5,i_6,i_7,i_8 = paramlist[4],paramlist[5],paramlist[6],paramlist[7]
value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
valuev =re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation and "//" not in value:
data_set_N.append(value)
except:
return
data_set_N = []
#Generate values for each parameter
I_1 = list(range(10))+list("+-")
I_2 = list(range(10))+list("+-*/")+["=="]
I_3 = list(range(10))+list("+-*/")+["=="]
I_4 = list(range(10))+list("+-*/")+["=="]
I_5 = list(range(10))+list("+-*/")+["=="]
I_6 = list(range(10))+list("+-*/")+["=="]
I_7 = list(range(10))+list("+-*/")+["=="]
I_8 = list(range(10))
paramlist = list(itertools.product(I_1,I_2,I_3,I_4,I_5,I_6,I_7,I_8))
if __name__ == "__main__":
with Manager() as manager:
data_set_N = manager.list() # <-- can be shared between processes.
processes = []
for i in range(10): #os.cpu_count() - 2 =10 , this range can be changed
p = Process(target=add_value, args=(data_set_N,paramlist)) # Passing the list
p.start()
processes.append(p)
for p in processes:
pool.close()
pool.join()
data_set_N = list(data_set_N) #the final list
这里的问题是 paramlist
导致 MemoryError
(因为它的大小是 12x15^6x10)。
有没有办法使用多处理来更快地执行代码(大约 10 小时),同时避免内存问题?
我会做的是:
- 使用多处理池。
- 保持基本代码不变,除了 7 个嵌套循环(七个内部循环)并且每个提交的任务都在处理
i_1
列表中的一个值。
- 在评估
value
后立即移动便宜的 '//' in value
检查,并可能避免进行不必要的正则表达式替换和调用 eval
函数。
这将使用最多 12 个内核(i_1
列表的长度)如果您的计算机有它们。如果你的计算机有 12 个内核 and 它们都是物理内核 and 没有别的是 运行,我希望执行时间是减少了 12 倍。如果你有 6 个物理核心和 6 个逻辑核心,而其他进程是 运行,那么执行时间显然不会减少 12 倍(我无法预测多少)。但这是解决记忆问题的最简单方法。
如果你有比 12 个多得多的核心,那么你可以定义 process_value
以拥有 6 个最内层循环并使用方法 starmap
(iterable
的每个元素将是元组和 process_value
现在除了托管列表之外还有两个参数,i_1
和 i_2
),iterable 参数是i_1
列表和 i_2
列表。
from multiprocessing import Pool, Manager, cpu_count
from functools import partial
import re
def process_value(data_set_N, i_1):
for i_2 in list(range(10))+list("+-*/")+["=="]:
for i_3 in list(range(10))+list("+-*/")+["=="]:
for i_4 in list(range(10))+list("+-*/")+["=="]:
for i_5 in list(range(10))+list("+-*/")+["=="]:
for i_6 in list(range(10))+list("+-*/")+["=="]:
for i_7 in list(range(10))+list("+-*/")+["=="]:
for i_8 in list(range(10)):
try:
value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
if '//' in value:
continue
valuev = re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation:
data_set_N.append(value)
except:
continue
if __name__ == '__main__':
with Manager() as manager:
data_set_N = manager.list()
# The iterable is the i_1 list:
i_1_list = list(range(10))+list("+-")
POOL_SIZE = min(cpu_count(), len(i_1_list))
pool = Pool(POOL_SIZE)
pool.map(partial(process_value, data_set_N), i_1_list)
pool.close()
pool.join()
print(len(data_set_N))
我正在尝试执行以下代码,以在最后保存列表 data_set_N。
import re
data_set_N=[]
for i_1 in list(range(10))+list("+-"):
for i_2 in list(range(10))+list("+-*/")+["=="]:
for i_3 in list(range(10))+list("+-*/")+["=="]:
for i_4 in list(range(10))+list("+-*/")+["=="]:
for i_5 in list(range(10))+list("+-*/")+["=="]:
for i_6 in list(range(10))+list("+-*/")+["=="]:
for i_7 in list(range(10))+list("+-*/")+["=="]:
for i_8 in list(range(10)):
try :
value= str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
valuev=re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation and "//" not in value:
data_set_N.append(value)
except:
continue
print(len(data_set_N))
问题是需要 50 多个小时,第一个 i_1 需要 4.5 小时。
为了让 data_set_N
更快,我想使用多处理。我的想法是使用这样的东西:
from multiprocessing import Process, Manager
import itertools
import re
def add_value(data_set_N,paramlist):
#I am not sure if this function is well defined
try
i_1,i_2,i_3,i_4 = paramlist[0],paramlist[1],paramlist[2],paramlist[3]
i_5,i_6,i_7,i_8 = paramlist[4],paramlist[5],paramlist[6],paramlist[7]
value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
valuev =re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation and "//" not in value:
data_set_N.append(value)
except:
return
data_set_N = []
#Generate values for each parameter
I_1 = list(range(10))+list("+-")
I_2 = list(range(10))+list("+-*/")+["=="]
I_3 = list(range(10))+list("+-*/")+["=="]
I_4 = list(range(10))+list("+-*/")+["=="]
I_5 = list(range(10))+list("+-*/")+["=="]
I_6 = list(range(10))+list("+-*/")+["=="]
I_7 = list(range(10))+list("+-*/")+["=="]
I_8 = list(range(10))
paramlist = list(itertools.product(I_1,I_2,I_3,I_4,I_5,I_6,I_7,I_8))
if __name__ == "__main__":
with Manager() as manager:
data_set_N = manager.list() # <-- can be shared between processes.
processes = []
for i in range(10): #os.cpu_count() - 2 =10 , this range can be changed
p = Process(target=add_value, args=(data_set_N,paramlist)) # Passing the list
p.start()
processes.append(p)
for p in processes:
pool.close()
pool.join()
data_set_N = list(data_set_N) #the final list
这里的问题是 paramlist
导致 MemoryError
(因为它的大小是 12x15^6x10)。
有没有办法使用多处理来更快地执行代码(大约 10 小时),同时避免内存问题?
我会做的是:
- 使用多处理池。
- 保持基本代码不变,除了 7 个嵌套循环(七个内部循环)并且每个提交的任务都在处理
i_1
列表中的一个值。 - 在评估
value
后立即移动便宜的'//' in value
检查,并可能避免进行不必要的正则表达式替换和调用eval
函数。
这将使用最多 12 个内核(i_1
列表的长度)如果您的计算机有它们。如果你的计算机有 12 个内核 and 它们都是物理内核 and 没有别的是 运行,我希望执行时间是减少了 12 倍。如果你有 6 个物理核心和 6 个逻辑核心,而其他进程是 运行,那么执行时间显然不会减少 12 倍(我无法预测多少)。但这是解决记忆问题的最简单方法。
如果你有比 12 个多得多的核心,那么你可以定义 process_value
以拥有 6 个最内层循环并使用方法 starmap
(iterable
的每个元素将是元组和 process_value
现在除了托管列表之外还有两个参数,i_1
和 i_2
),iterable 参数是i_1
列表和 i_2
列表。
from multiprocessing import Pool, Manager, cpu_count
from functools import partial
import re
def process_value(data_set_N, i_1):
for i_2 in list(range(10))+list("+-*/")+["=="]:
for i_3 in list(range(10))+list("+-*/")+["=="]:
for i_4 in list(range(10))+list("+-*/")+["=="]:
for i_5 in list(range(10))+list("+-*/")+["=="]:
for i_6 in list(range(10))+list("+-*/")+["=="]:
for i_7 in list(range(10))+list("+-*/")+["=="]:
for i_8 in list(range(10)):
try:
value = str(i_1)+str(i_2)+str(i_3)+str(i_4)+str(i_5)+str(i_6)+str(i_7)+str(i_8)
if '//' in value:
continue
valuev = re.sub(r'\b0+(?!\b)', '', value)
evaluation = eval(valuev)
if type(evaluation) == type(True) and evaluation:
data_set_N.append(value)
except:
continue
if __name__ == '__main__':
with Manager() as manager:
data_set_N = manager.list()
# The iterable is the i_1 list:
i_1_list = list(range(10))+list("+-")
POOL_SIZE = min(cpu_count(), len(i_1_list))
pool = Pool(POOL_SIZE)
pool.map(partial(process_value, data_set_N), i_1_list)
pool.close()
pool.join()
print(len(data_set_N))