python 脚本中的多处理函数
Multiprocessing function in python script
我正在尝试对脚本中的函数使用多重处理,我认为这是我的多重处理结构和语法的问题。这里的目标是遍历列表并从 13 个不同的空间地理数据库中复制出 raster/tiff 图像。输出和输入都保存在单独的地理数据库中,以加快处理速度并防止崩溃。这是我的示例脚本:
import arcpy, time, os
from arcpy import env
import multiprocessing
from multiprocessing import pool
def copy_rasters_3b(P6_GDBs, x, c):
mosaic_gdb = os.path.join('{}/mosaic_{}.gdb/{}'.format(P6_GDBs, x, c))
final_gdb = os.path.join('{}/final_{}.gdb/{}'.format(P6_GDBs, x, c))
final_tiff = os.path.join('{}/{}.tif'.format(P6_GDBs, c))
print "---Copying Rasters Started---"
start_time = time.time()
arcpy.CopyRaster_management(mosaic_gdb, final_gdb, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "", "NONE")
arcpy.CopyRaster_management(mosaic_gdb, final_tiff, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "TIFF", "NONE")
print("--- "+ c + " Complete %s seconds ---" % (time.time() - start_time))
### Main ###
def main():
P6_DIR= "D:/P6_SecondRun"
P6_GDBs= "D:/P6_GDBs"
Categories =['CRP', 'FORE', 'INR', 'IR', 'MO', 'PAS', 'TCI', 'TCT', 'TG', 'WAT', 'WLF', 'WLO', 'WLT']
rasters = defaultdict(list)
# Environments
arcpy.env.overwriteOutput = True
arcpy.env.snapRaster = "D:/A__Snap/Phase6_Snap.tif"
arcpy.env.outputCoordinateSystem = arcpy.SpatialReference(102039) #arcpy.SpatialReference(3857)
pool = multiprocessing.Pool(processes=5)
for c, x in zip(Categories, range(1,14)):
pool.map(copy_rasters_3b(P6_GDBs, x, c), 14)
pool.close()
############### EXECUTE MAIN() ###############
if __name__ == "__main__":
main()
此脚本在后台启动五个进程,但在前两个实例后最终失败。我是 运行 ArcMap 10.4 x64 并且使用 Python27-64 位。
首先,您应该使用一个函数和一个为该函数提供参数的可迭代对象调用 pool.map
(一次)。这样做的原因是,实际的函数调用是在 中 中进行的 sub-processes。相反,您实际上是直接调用函数并将其结果传递给 pool.map
,这没有用。
其次,您在循环内调用 pool.close
,然后在下一次迭代中再次调用 pool.map
。 pool 对象的模式是将所有作业提交给它。完成后调用 pool.close
等待所有作业完成。在这种情况下,将通过单个 .map
方法调用将所有作业分配给它。
您的代码可能如下所示:
def copy_rasters_3b(arg_tuple):
P6_GDBs, x, c = arg_tuple # Break apart argument tuple
... # Rest same as before
pool = multiprocessing.Pool(processes=5)
pool.map(copy_rasters_3b, [(P6_GDBs, x, c) for x, c in enumerate(Categories)])
pool.close()
分解列表推导式:Categories
是一个字符串列表,因此 enumerate(Categories)
生成一组带编号的元组 (N, member)
,其中 N 是列表中的位置,成员是列表中的值。我们将这两者与每个结果元组中的常量值 P6_GDBs 结合起来。那么理解的最终结果是一个三元组列表。
map
将在每次调用时将其可迭代参数中的单个元素提供给函数。但是,该单个元素是 3 元组,因此此处的函数需要将一个参数分解为其组成部分。
最后,您应该删除以下导入行,因为它令人困惑且不必要。在这里你导入了名称 pool
但稍后(从未使用过它),你通过将 pool
作为变量名称分配给它来用新值覆盖它:
from multiprocessing import pool
[...]
pool = multiprocessing.Pool(processes=5)
我正在尝试对脚本中的函数使用多重处理,我认为这是我的多重处理结构和语法的问题。这里的目标是遍历列表并从 13 个不同的空间地理数据库中复制出 raster/tiff 图像。输出和输入都保存在单独的地理数据库中,以加快处理速度并防止崩溃。这是我的示例脚本:
import arcpy, time, os
from arcpy import env
import multiprocessing
from multiprocessing import pool
def copy_rasters_3b(P6_GDBs, x, c):
mosaic_gdb = os.path.join('{}/mosaic_{}.gdb/{}'.format(P6_GDBs, x, c))
final_gdb = os.path.join('{}/final_{}.gdb/{}'.format(P6_GDBs, x, c))
final_tiff = os.path.join('{}/{}.tif'.format(P6_GDBs, c))
print "---Copying Rasters Started---"
start_time = time.time()
arcpy.CopyRaster_management(mosaic_gdb, final_gdb, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "", "NONE")
arcpy.CopyRaster_management(mosaic_gdb, final_tiff, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "TIFF", "NONE")
print("--- "+ c + " Complete %s seconds ---" % (time.time() - start_time))
### Main ###
def main():
P6_DIR= "D:/P6_SecondRun"
P6_GDBs= "D:/P6_GDBs"
Categories =['CRP', 'FORE', 'INR', 'IR', 'MO', 'PAS', 'TCI', 'TCT', 'TG', 'WAT', 'WLF', 'WLO', 'WLT']
rasters = defaultdict(list)
# Environments
arcpy.env.overwriteOutput = True
arcpy.env.snapRaster = "D:/A__Snap/Phase6_Snap.tif"
arcpy.env.outputCoordinateSystem = arcpy.SpatialReference(102039) #arcpy.SpatialReference(3857)
pool = multiprocessing.Pool(processes=5)
for c, x in zip(Categories, range(1,14)):
pool.map(copy_rasters_3b(P6_GDBs, x, c), 14)
pool.close()
############### EXECUTE MAIN() ###############
if __name__ == "__main__":
main()
此脚本在后台启动五个进程,但在前两个实例后最终失败。我是 运行 ArcMap 10.4 x64 并且使用 Python27-64 位。
首先,您应该使用一个函数和一个为该函数提供参数的可迭代对象调用 pool.map
(一次)。这样做的原因是,实际的函数调用是在 中 中进行的 sub-processes。相反,您实际上是直接调用函数并将其结果传递给 pool.map
,这没有用。
其次,您在循环内调用 pool.close
,然后在下一次迭代中再次调用 pool.map
。 pool 对象的模式是将所有作业提交给它。完成后调用 pool.close
等待所有作业完成。在这种情况下,将通过单个 .map
方法调用将所有作业分配给它。
您的代码可能如下所示:
def copy_rasters_3b(arg_tuple):
P6_GDBs, x, c = arg_tuple # Break apart argument tuple
... # Rest same as before
pool = multiprocessing.Pool(processes=5)
pool.map(copy_rasters_3b, [(P6_GDBs, x, c) for x, c in enumerate(Categories)])
pool.close()
分解列表推导式:Categories
是一个字符串列表,因此 enumerate(Categories)
生成一组带编号的元组 (N, member)
,其中 N 是列表中的位置,成员是列表中的值。我们将这两者与每个结果元组中的常量值 P6_GDBs 结合起来。那么理解的最终结果是一个三元组列表。
map
将在每次调用时将其可迭代参数中的单个元素提供给函数。但是,该单个元素是 3 元组,因此此处的函数需要将一个参数分解为其组成部分。
最后,您应该删除以下导入行,因为它令人困惑且不必要。在这里你导入了名称 pool
但稍后(从未使用过它),你通过将 pool
作为变量名称分配给它来用新值覆盖它:
from multiprocessing import pool
[...]
pool = multiprocessing.Pool(processes=5)