多重处理在 Python 结束后如何合并多个文件?

How to merge multiple files once multiprocessing has ended in Python?

在我的代码中,multiprocessing Process 用于同时生成多个 impdp 作业(导入),每个作业生成一个日志文件,动态名称为:

'/DP_IMP_' + DP_PDB_FULL_NAME[i] + '' + DP_WORKLOAD + '' + str(vardate) + '.log'

vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, DP_DURATION_SECONDS, vardate, ))
                 p1.start()

我想在所有进程结束后将创建的所有日志文件合并到一个大的主日志文件中。但是,当我尝试在 (for i in range((len(DP_PDB_FULL_NAME))) 循环下使用类似的东西时:

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:
            shutil.copyfileobj(fd, wfd)

然后它会尝试在进程结束之前写入文件。

此处,DP_PDB_FULL_NAME 是多个数据库的列表,因此多个进程在多个数据库中同时产生。当我尝试在循环结束后添加 p1.join() 时,多重处理不会在多个数据库中发生。

那么,在完成所有单个进程后,我应该如何创建主日志文件?

您应该创建某种结构来存储所需的变量和进程句柄。在该循环之后用 join 阻塞,直到所有子进程完成,然后处理结果文件。

handles = []
for i in range(10):
    p = Process()
    p.start()
    handles.append(p)

for handle in handles:
    handle.join()

所以,我在第一个循环结束后添加了 p1.join(),现在可以正常工作了!

vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, DP_DURATION_SECONDS, vardate, ))
                 p1.start()
p1.join()

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:

进一步解释一下,在上面的场景中添加join有3种情况,multiprocessing相应地工作。

  1. 在最里面的 for 循环中:

    因此,如果在此处添加 join,那么多处理将根本无法工作,因为它就在 proc.start()

    之后
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, ))
                 p1.start()
                 p1.join()
  1. 外层for循环内(最内层for循环外)

    在这里,多处理仅适用于内部循环,而不适用于多个数据库

for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, ))
                 p1.start()
        p1.join()
  1. 在外部 for 循环之外

    这是正确的解决方案(如上所述),它在所有使用多处理的循环之外。