将输出保存到多处理进程之间的变量
Save outputs to variable between multiprocessing processes
我正在寻找一个简单的解决方案,在我使用多处理时将输出保存到单个文件或单个变量。
我的输出是一个列表,当我尝试在多处理结束时加入所有内容时,出现错误
另一种方法是在输出函数的每一步都保存一个文件,但是很占内存
有没有办法在多进程之间保存这个列表?
def calculate(data,ylat,xlon):
output = []
for j,i in data:
...
output.append(lat,lon,area,fraction_area)
L.append(output)
print(lat,lon,area,fraction_area,file=f)
return output
# Multiprocessing
# number of polygons are 1200000
f=open('name.txt','w')
with mp.Manager() as manager:
L = manager.list()
pool = mp.Pool()
for index,polys in area_study.iterrows():
# Limits each polygon in shapefile
ylat = [ymin,ymax]
xlon= [xmin,xmax]
args.append((polys,ylat,xlon))
p=pool.starmap(calculate,args)
pool.close()
pool.join()
请参阅我上面关于附加到托管列表似乎是不必要的操作的评论。
我会使用生成连续 (polys, ylat, xlon)
元组的生成器函数,结合 Pool.imap
方法(以及一个高效的 chunksize 参数),可以懒惰地在进行过程中生成所有参数,并处理 compute
中的 return 值,在主进程可用时写入结果。这是大概的思路,大家可以根据需要进行调整。
请注意,在您的函数 calculate
中,语句 output.append(lat,lon,area,fraction_area)
无效,因为 append
仅接受一个参数。我在下面的代码中假设您正在尝试附加一个元组(或列表):
import multiprocessing as mp
def calculate(t):
data, ylat, xlon = t # unpack
output = []
for j, i in data:
...
output.append((lat, lon, area, fraction_area))
return output
def compute_chunksize(poolsize, iterable_size):
chunksize, remainder = divmod(iterable_size, 4 * poolsize)
if remainder:
chunksize += 1
return chunksize
def generate_polygons():
for index, polys in area_study.iterrows():
# Limits each polygon in shapefile
ylat = [ymin,ymax]
xlon = [xmin,xmax]
yield polys, ylat, xlon
# Required for Windows
if __name__ == '__main__':
# number of polygons is 1200000
poolsize = mp.cpu_count()
# Best guess as to the size of the iterable being passed to imap:
iterable_size = 1_200_000
chunksize = compute_chunksize(poolsize, iterable_size)
with open('name.txt', w) as f:
pool = mp.Pool(poolsize)
# Each result is a list of tuples of lat, lon, area, fraction_area
for result in pool.imap(calculate, generate_polygons(), chunksize=chunksize):
for t in result:
print(*t, file=f)
"""
lat, lon, area, fraction_area = t # unpack
print(lat, lon, area, fraction_area, file=f)
"""
pool.close()
pool.join()
我正在寻找一个简单的解决方案,在我使用多处理时将输出保存到单个文件或单个变量。
我的输出是一个列表,当我尝试在多处理结束时加入所有内容时,出现错误
另一种方法是在输出函数的每一步都保存一个文件,但是很占内存
有没有办法在多进程之间保存这个列表?
def calculate(data,ylat,xlon):
output = []
for j,i in data:
...
output.append(lat,lon,area,fraction_area)
L.append(output)
print(lat,lon,area,fraction_area,file=f)
return output
# Multiprocessing
# number of polygons are 1200000
f=open('name.txt','w')
with mp.Manager() as manager:
L = manager.list()
pool = mp.Pool()
for index,polys in area_study.iterrows():
# Limits each polygon in shapefile
ylat = [ymin,ymax]
xlon= [xmin,xmax]
args.append((polys,ylat,xlon))
p=pool.starmap(calculate,args)
pool.close()
pool.join()
请参阅我上面关于附加到托管列表似乎是不必要的操作的评论。
我会使用生成连续 (polys, ylat, xlon)
元组的生成器函数,结合 Pool.imap
方法(以及一个高效的 chunksize 参数),可以懒惰地在进行过程中生成所有参数,并处理 compute
中的 return 值,在主进程可用时写入结果。这是大概的思路,大家可以根据需要进行调整。
请注意,在您的函数 calculate
中,语句 output.append(lat,lon,area,fraction_area)
无效,因为 append
仅接受一个参数。我在下面的代码中假设您正在尝试附加一个元组(或列表):
import multiprocessing as mp
def calculate(t):
data, ylat, xlon = t # unpack
output = []
for j, i in data:
...
output.append((lat, lon, area, fraction_area))
return output
def compute_chunksize(poolsize, iterable_size):
chunksize, remainder = divmod(iterable_size, 4 * poolsize)
if remainder:
chunksize += 1
return chunksize
def generate_polygons():
for index, polys in area_study.iterrows():
# Limits each polygon in shapefile
ylat = [ymin,ymax]
xlon = [xmin,xmax]
yield polys, ylat, xlon
# Required for Windows
if __name__ == '__main__':
# number of polygons is 1200000
poolsize = mp.cpu_count()
# Best guess as to the size of the iterable being passed to imap:
iterable_size = 1_200_000
chunksize = compute_chunksize(poolsize, iterable_size)
with open('name.txt', w) as f:
pool = mp.Pool(poolsize)
# Each result is a list of tuples of lat, lon, area, fraction_area
for result in pool.imap(calculate, generate_polygons(), chunksize=chunksize):
for t in result:
print(*t, file=f)
"""
lat, lon, area, fraction_area = t # unpack
print(lat, lon, area, fraction_area, file=f)
"""
pool.close()
pool.join()