如何按时间顺序使用multiprocessing?

How to use multiprocessing in a chronical order?

我有一个存在 2 个进程的 python 脚本:

  1. 进程 1:加载和解压缩文件
  2. 进程 2:处理文件,用它做一些事情。

在实施多处理之前,该软件似乎按时间顺序工作。加载所有压缩文件,解压缩它们,然后打开它们来做一些事情。

所以我在游戏中引入了多重处理,现在似乎在加载和解压缩文件的同时,打开和处理文件的过程已经开始了。所以有多个进程同时做事情。问题是,当我 运行 大数据(超过 100 多个文件)的这段代码时,我遇到了并发文件访问问题。这导致 PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 当我 运行 小数据集(大约 30 个文件)上的片段时,它似乎没问题,因为文件解压缩的速度非常快,正好在进程 2 启动时。

我想要什么:我想保留多处理,因为它加快了速度,但我希望进程 2 仅在所有文件都已解压缩后才启动(例如进程 1已完成)。

这是我的代码片段:

import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    """
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    """
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):
            os.makedirs(new)
        zipfile.ZipFile(path).extractall(new)

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

# Required for Windows:
if __name__ == '__main__':
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')


#Start proces 2 
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]

dates_to_process = []
root = Path('.\middle_stage').resolve()


at_set = {'Audi', 'Mercedes', 'Volkswagen'}

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            
            return condition_3
    return False

#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
    print("\tReading missing date: ", missing_date)
    files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
    if len(files) != 13:
        continue
    dates_to_process.append(missing_date)  

    vehicle_loc_dict = collections.defaultdict(list)
    for file in files:      
        with open(file, 'r') as log_file:
            reader = csv.reader(log_file, delimiter = ',')
            next(reader) # skip header
            for row in reader:
                if filter_row(row, missing_date): 
                    print('filter_row has been executed!')

    data_per_date[missing_date] = vehicle_loc_dict 

主线程

在主线程中我们需要设置队列并将压缩文件添加到队列

import threading
import queue 

zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()

for file in files:
   zippedQueue.put(file)

工作线程

class ChiSquaredThread(threading.Thread):
    def __init__(self):
         threading.Thread.__init__(self)

    def run(self):
        unzipFile()

# add all your zipped files to the zippedQueue 
def unzipFile():
   zippedLock.acquire()
   if not zippedQueue.empty():
      zipped = zippedQueue.get()
      zippedLock.release()
      # unzip the zipped file here
      # add unziped file to queue
      unzipedQueue.put(unzippedFile)

然后创建一个与执行相同步骤并处理文件的工作线程块类似的块。工作线程中的这个示例块应该会指导您

问题似乎如下:

如果您在 Windows 下 运行(并且根据您的目录名称,情况似乎是这样),每当您创建一个新进程时(这里您将创建 13 个新进程)通过创建多处理池来处理),使用 spawn 创建进程的方法。这意味着创建了一个新的空地址 space,Python 解释器将在其中重新启动,并且您的源程序将从顶部重新执行以通过执行初始化地址 space全局范围内的所有语句 除了 if __name__ == '__main__': 开头的块内的任何语句,因为在这些新进程中,此条件将为 False。这也是为什么您将创建新进程的代码放在这样的块中,即这样您就不会进入创建新进程的递归循环 ad inifinitum.

就是说,您所谓的 process 2 语句在全局范围内而不在 if __name__ == '__main__': 块内,因此它们在初始化时并行执行了 13 次多处理池。但我可以想象一个场景,池中的进程 1 执行代码无效,因为尚未解压缩任何内容,然后现在它已经初始化,它开始解压缩文件。同时,池中的其他进程启动 运行 其初始化代码,现在存在冲突。

解决方法是把流程2的代码搬过来如下:

import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    """
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    """
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):
            os.makedirs(new)
        zipfile.ZipFile(path).extractall(new)

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

    return False

def process1():
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            
            return condition_3

def process2():   
    #Start proces 2 
    all_missing_dates = ['20210701', '20210702']
    missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
    
    dates_to_process = []
    root = Path('.\middle_stage').resolve()   
    
    at_set = {'Audi', 'Mercedes', 'Volkswagen'}
    
    #Open the files and read the rows
    print("Start reading data")
    data_per_date = dict()
    for missing_date in missing_dates:
        print("\tReading missing date: ", missing_date)
        files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
        if len(files) != 13:
            continue
        dates_to_process.append(missing_date)  
    
        vehicle_loc_dict = collections.defaultdict(list)
        for file in files:      
            with open(file, 'r') as log_file:
                reader = csv.reader(log_file, delimiter = ',')
                next(reader) # skip header
                for row in reader:
                    if filter_row(row, missing_date): 
                        print('filter_row has been executed!')
    
        data_per_date[missing_date] = vehicle_loc_dict
        
def main():
    process1()
    process2()

if __name__ == '__main__':
    main()