python 脚本中的多处理函数

Multiprocessing function in python script

我正在尝试对脚本中的函数使用多重处理,我认为这是我的多重处理结构和语法的问题。这里的目标是遍历列表并从 13 个不同的空间地理数据库中复制出 raster/tiff 图像。输出和输入都保存在单独的地理数据库中,以加快处理速度并防止崩溃。这是我的示例脚本:

import arcpy, time, os
from arcpy import env
import multiprocessing
from multiprocessing import pool

def copy_rasters_3b(P6_GDBs, x, c):
        mosaic_gdb = os.path.join('{}/mosaic_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_gdb = os.path.join('{}/final_{}.gdb/{}'.format(P6_GDBs, x, c))
        final_tiff = os.path.join('{}/{}.tif'.format(P6_GDBs, c))
        print "---Copying Rasters Started---"
        start_time = time.time()
        arcpy.CopyRaster_management(mosaic_gdb, final_gdb, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "", "NONE")
        arcpy.CopyRaster_management(mosaic_gdb, final_tiff, "", "", "", "NONE", "NONE", "8_BIT_UNSIGNED", "NONE", "NONE", "TIFF", "NONE")
        print("--- "+ c + " Complete %s seconds ---" % (time.time() - start_time))

### Main ###
def main():
    P6_DIR= "D:/P6_SecondRun"
    P6_GDBs= "D:/P6_GDBs"
    Categories =['CRP', 'FORE', 'INR', 'IR', 'MO', 'PAS', 'TCI', 'TCT', 'TG', 'WAT', 'WLF', 'WLO', 'WLT']
    rasters = defaultdict(list)

    # Environments
    arcpy.env.overwriteOutput = True
    arcpy.env.snapRaster = "D:/A__Snap/Phase6_Snap.tif"
    arcpy.env.outputCoordinateSystem = arcpy.SpatialReference(102039) #arcpy.SpatialReference(3857)

    pool = multiprocessing.Pool(processes=5)

    for c, x in zip(Categories, range(1,14)):
        pool.map(copy_rasters_3b(P6_GDBs, x, c), 14)
        pool.close()

############### EXECUTE MAIN() ###############
if __name__ == "__main__":
    main()

此脚本在后台启动五个进程,但在前两个实例后最终失败。我是 运行 ArcMap 10.4 x64 并且使用 Python27-64 位。

首先,您应该使用一个函数和一个为该函数提供参数的可迭代对象调用 pool.map(一次)。这样做的原因是,实际的函数调用是在 中进行的 sub-processes。相反,您实际上是直接调用函数并将其结果传递给 pool.map,这没有用。

其次,您在循环内调用 pool.close,然后在下一次迭代中再次调用 pool.map。 pool 对象的模式是将所有作业提交给它。完成后调用 pool.close 等待所有作业完成。在这种情况下,将通过单个 .map 方法调用将所有作业分配给它。

您的代码可能如下所示:

def copy_rasters_3b(arg_tuple):
    P6_GDBs, x, c = arg_tuple    # Break apart argument tuple
    ... # Rest same as before

pool = multiprocessing.Pool(processes=5)
pool.map(copy_rasters_3b, [(P6_GDBs, x, c) for x, c in enumerate(Categories)])
pool.close()

分解列表推导式:Categories 是一个字符串列表,因此 enumerate(Categories) 生成一组带编号的元组 (N, member),其中 N 是列表中的位置,成员是列表中的值。我们将这两者与每个结果元组中的常量值 P6_GDBs 结合起来。那么理解的最终结果是一个三元组列表。

map 将在每次调用时将其可迭代参数中的单个元素提供给函数。但是,该单个元素是 3 元组,因此此处的函数需要将一个参数分解为其组成部分。

最后,您应该删除以下导入行,因为它令人困惑且不必要。在这里你导入了名称 pool 但稍后(从未使用过它),你通过将 pool 作为变量名称分配给它来用新值覆盖它:

from multiprocessing import pool
[...]
pool = multiprocessing.Pool(processes=5)