如何按时间顺序使用multiprocessing?
How to use multiprocessing in a chronical order?
我有一个存在 2 个进程的 python 脚本:
- 进程 1:加载和解压缩文件
- 进程 2:处理文件,用它做一些事情。
在实施多处理之前,该软件似乎按时间顺序工作。加载所有压缩文件,解压缩它们,然后打开它们来做一些事情。
所以我在游戏中引入了多重处理,现在似乎在加载和解压缩文件的同时,打开和处理文件的过程已经开始了。所以有多个进程同时做事情。问题是,当我 运行 大数据(超过 100 多个文件)的这段代码时,我遇到了并发文件访问问题。这导致 PermissionError: [WinError 32] The process cannot access the file because it is being used by another process:
当我 运行 小数据集(大约 30 个文件)上的片段时,它似乎没问题,因为文件解压缩的速度非常快,正好在进程 2 启动时。
我想要什么:我想保留多处理,因为它加快了速度,但我希望进程 2 仅在所有文件都已解压缩后才启动(例如进程 1已完成)。
这是我的代码片段:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
# Required for Windows:
if __name__ == '__main__':
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
return False
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
主线程
在主线程中我们需要设置队列并将压缩文件添加到队列
import threading
import queue
zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()
for file in files:
zippedQueue.put(file)
工作线程
class ChiSquaredThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
unzipFile()
# add all your zipped files to the zippedQueue
def unzipFile():
zippedLock.acquire()
if not zippedQueue.empty():
zipped = zippedQueue.get()
zippedLock.release()
# unzip the zipped file here
# add unziped file to queue
unzipedQueue.put(unzippedFile)
然后创建一个与执行相同步骤并处理文件的工作线程块类似的块。工作线程中的这个示例块应该会指导您
问题似乎如下:
如果您在 Windows 下 运行(并且根据您的目录名称,情况似乎是这样),每当您创建一个新进程时(这里您将创建 13 个新进程)通过创建多处理池来处理),使用 spawn 创建进程的方法。这意味着创建了一个新的空地址 space,Python 解释器将在其中重新启动,并且您的源程序将从顶部重新执行以通过执行初始化地址 space全局范围内的所有语句 除了 以 if __name__ == '__main__':
开头的块内的任何语句,因为在这些新进程中,此条件将为 False
。这也是为什么您将创建新进程的代码放在这样的块中,即这样您就不会进入创建新进程的递归循环 ad inifinitum.
就是说,您所谓的 process 2 语句在全局范围内而不在 if __name__ == '__main__':
块内,因此它们在初始化时并行执行了 13 次多处理池。但我可以想象一个场景,池中的进程 1 执行代码无效,因为尚未解压缩任何内容,然后现在它已经初始化,它开始解压缩文件。同时,池中的其他进程启动 运行 其初始化代码,现在存在冲突。
解决方法是把流程2的代码搬过来如下:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
return False
def process1():
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
def process2():
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
def main():
process1()
process2()
if __name__ == '__main__':
main()
我有一个存在 2 个进程的 python 脚本:
- 进程 1:加载和解压缩文件
- 进程 2:处理文件,用它做一些事情。
在实施多处理之前,该软件似乎按时间顺序工作。加载所有压缩文件,解压缩它们,然后打开它们来做一些事情。
所以我在游戏中引入了多重处理,现在似乎在加载和解压缩文件的同时,打开和处理文件的过程已经开始了。所以有多个进程同时做事情。问题是,当我 运行 大数据(超过 100 多个文件)的这段代码时,我遇到了并发文件访问问题。这导致 PermissionError: [WinError 32] The process cannot access the file because it is being used by another process:
当我 运行 小数据集(大约 30 个文件)上的片段时,它似乎没问题,因为文件解压缩的速度非常快,正好在进程 2 启动时。
我想要什么:我想保留多处理,因为它加快了速度,但我希望进程 2 仅在所有文件都已解压缩后才启动(例如进程 1已完成)。
这是我的代码片段:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
# Required for Windows:
if __name__ == '__main__':
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
return False
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
主线程
在主线程中我们需要设置队列并将压缩文件添加到队列
import threading
import queue
zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()
for file in files:
zippedQueue.put(file)
工作线程
class ChiSquaredThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
unzipFile()
# add all your zipped files to the zippedQueue
def unzipFile():
zippedLock.acquire()
if not zippedQueue.empty():
zipped = zippedQueue.get()
zippedLock.release()
# unzip the zipped file here
# add unziped file to queue
unzipedQueue.put(unzippedFile)
然后创建一个与执行相同步骤并处理文件的工作线程块类似的块。工作线程中的这个示例块应该会指导您
问题似乎如下:
如果您在 Windows 下 运行(并且根据您的目录名称,情况似乎是这样),每当您创建一个新进程时(这里您将创建 13 个新进程)通过创建多处理池来处理),使用 spawn 创建进程的方法。这意味着创建了一个新的空地址 space,Python 解释器将在其中重新启动,并且您的源程序将从顶部重新执行以通过执行初始化地址 space全局范围内的所有语句 除了 以 if __name__ == '__main__':
开头的块内的任何语句,因为在这些新进程中,此条件将为 False
。这也是为什么您将创建新进程的代码放在这样的块中,即这样您就不会进入创建新进程的递归循环 ad inifinitum.
就是说,您所谓的 process 2 语句在全局范围内而不在 if __name__ == '__main__':
块内,因此它们在初始化时并行执行了 13 次多处理池。但我可以想象一个场景,池中的进程 1 执行代码无效,因为尚未解压缩任何内容,然后现在它已经初始化,它开始解压缩文件。同时,池中的其他进程启动 运行 其初始化代码,现在存在冲突。
解决方法是把流程2的代码搬过来如下:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
return False
def process1():
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
def process2():
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
def main():
process1()
process2()
if __name__ == '__main__':
main()