
How to use multiprocessing in a chronical order?

我有一个存在 2 个进程的 python 脚本:

  1. 进程 1:加载和解压缩文件
  2. 进程 2:处理文件,用它做一些事情。


所以我在游戏中引入了多重处理,现在似乎在加载和解压缩文件的同时,打开和处理文件的过程已经开始了。所以有多个进程同时做事情。问题是,当我 运行 大数据(超过 100 多个文件)的这段代码时,我遇到了并发文件访问问题。这导致 PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 当我 运行 小数据集(大约 30 个文件)上的片段时,它似乎没问题,因为文件解压缩的速度非常快,正好在进程 2 启动时。

我想要什么:我想保留多处理,因为它加快了速度,但我希望进程 2 仅在所有文件都已解压缩后才启动(例如进程 1已完成)。


import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

# Required for Windows:
if __name__ == '__main__':
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')

#Start proces 2 
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]

dates_to_process = []
root = Path('.\middle_stage').resolve()

at_set = {'Audi', 'Mercedes', 'Volkswagen'}

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            return condition_3
    return False

#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
    print("\tReading missing date: ", missing_date)
    files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
    if len(files) != 13:

    vehicle_loc_dict = collections.defaultdict(list)
    for file in files:      
        with open(file, 'r') as log_file:
            reader = csv.reader(log_file, delimiter = ',')
            next(reader) # skip header
            for row in reader:
                if filter_row(row, missing_date): 
                    print('filter_row has been executed!')

    data_per_date[missing_date] = vehicle_loc_dict 



import threading
import queue 

zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()

for file in files:


class ChiSquaredThread(threading.Thread):
    def __init__(self):

    def run(self):

# add all your zipped files to the zippedQueue 
def unzipFile():
   if not zippedQueue.empty():
      zipped = zippedQueue.get()
      # unzip the zipped file here
      # add unziped file to queue



如果您在 Windows 下 运行(并且根据您的目录名称,情况似乎是这样),每当您创建一个新进程时(这里您将创建 13 个新进程)通过创建多处理池来处理),使用 spawn 创建进程的方法。这意味着创建了一个新的空地址 space,Python 解释器将在其中重新启动,并且您的源程序将从顶部重新执行以通过执行初始化地址 space全局范围内的所有语句 除了 if __name__ == '__main__': 开头的块内的任何语句,因为在这些新进程中,此条件将为 False。这也是为什么您将创建新进程的代码放在这样的块中,即这样您就不会进入创建新进程的递归循环 ad inifinitum.

就是说,您所谓的 process 2 语句在全局范围内而不在 if __name__ == '__main__': 块内,因此它们在初始化时并行执行了 13 次多处理池。但我可以想象一个场景,池中的进程 1 执行代码无效,因为尚未解压缩任何内容,然后现在它已经初始化,它开始解压缩文件。同时,池中的其他进程启动 运行 其初始化代码,现在存在冲突。


import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

    return False

def process1():
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            return condition_3

def process2():   
    #Start proces 2 
    all_missing_dates = ['20210701', '20210702']
    missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
    dates_to_process = []
    root = Path('.\middle_stage').resolve()   
    at_set = {'Audi', 'Mercedes', 'Volkswagen'}
    #Open the files and read the rows
    print("Start reading data")
    data_per_date = dict()
    for missing_date in missing_dates:
        print("\tReading missing date: ", missing_date)
        files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
        if len(files) != 13:
        vehicle_loc_dict = collections.defaultdict(list)
        for file in files:      
            with open(file, 'r') as log_file:
                reader = csv.reader(log_file, delimiter = ',')
                next(reader) # skip header
                for row in reader:
                    if filter_row(row, missing_date): 
                        print('filter_row has been executed!')
        data_per_date[missing_date] = vehicle_loc_dict
def main():

if __name__ == '__main__':