在 Python 中保存由多处理的不同进程生成的文件的最安全方法是什么?
What is the safest method to save files generated by different processes with multiprocessing in Python?
我对使用 multiprocessing 包完全陌生。我已经构建了一个基于代理的模型,并希望 运行 并行使用不同参数进行大量模拟。我的模型采用 xml 文件,提取一些参数和 运行 模拟,然后生成两个 pandas 数据帧并将它们保存为 pickle 文件。
我正在尝试使用 multiprocessing.Process() class,但是这两个数据帧没有正确保存,而对于某些模拟,我得到一个数据帧,而其他人没有数据帧。
我是否为此类工作使用了正确的 class?使用多处理模块将模拟结果写入磁盘的最安全方法是什么?
我补充说,如果我用一个简单的循环按顺序启动模拟,我会得到正确的输出。
感谢支持
我添加了一个不可重现的代码示例,因为我无法共享由许多模块和 xml 个文件组成的模型。
import time
import multiprocessing
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = MYMODEL()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents" + ".pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model" + ".pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
processes = []
for path, name in zip(paths, names):
p = multiprocessing.Process(target=run, args=(path, name))
processes.append(p)
p.start()
for process in processes:
process.join()
我可以详细说明我的评论,但是,唉,看看你的代码,对你的模型一无所知,我看不出你提到的问题的明显原因。
我在评论中提到,我会根据您的处理是 I/O 绑定还是 CPU 绑定来使用线程池或处理器池,以便更好地控制 threads/processes 你创造。虽然创建线程的开销较小,但 Python 解释器将在同一进程中执行,因此在执行 Python 字节码时没有并行性,因为必须首先使用全局解释器锁 (GIL)获得。因此,出于这个原因,通常建议将处理器池用于 CPU 密集型作业。然而,当执行发生在 运行time 以 C 语言实现的库中时,例如 numpy
和 pandas
的情况,Python 解释器释放 GIL,你仍然可以和线程有很高的并行度。但我不知道 ProtonOC
class 实例进行了哪些自然处理。一些如果它显然是 I/O 相关。所以现在我会建议您首先尝试一个线程池,我为其任意设置了最大大小 20(我凭空得出的数字)。这里的问题是你正在对你的磁盘进行并发操作,我不知道太多线程是否会减慢磁盘操作(你有一个固态驱动器,手臂移动不是问题吗?)
如果您 运行 下面的代码示例将 MAX_CONCURRENCY
设置为 1,大概它应该可以工作。当然,这不是您的最终目标。但它演示了设置并发性是多么容易。
import time
from concurrent.futures import ThreadPoolExecutor as Executor
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = ProtonOC()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents.pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model.pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
executor.map(run, paths, names)
要使用进程池,更改:
from concurrent.futures import ThreadPoolExecutor as Executor
至:
from concurrent.futures import ProcessPoolExecutor as Executor
然后您可能希望更改 MAX_CONCURRENCY
。但是因为作业仍然涉及很多 I/O 并在执行此操作时放弃处理器 I/O,您可能会受益于此值大于您拥有的 CPU 的数量。
更新
使用 ThreadPoolExecutor
class 的 map
方法的替代方法是使用 submit
。这使您有机会根据个人作业提交处理任何异常:
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
futures = [executor.submit(run, path, name) for path, name in zip(paths, names)]
for future in futures:
try:
result = future.get() # return value from run, which is None
except Exception as e: # any exception run might have thrown
print(e) # handle this as you see fit
您应该知道,这会逐个提交作业,而 map
与 ProcessPoolExecutor
一起使用时,允许您指定 chunksize
参数。如果池大小为 N 和 M 个作业要提交,其中 M 远大于 N,将池中的每个进程一次放入工作队列 chunksize
个作业比一次放置一个作业更有效减少所需共享内存传输次数的时间。但是只要你使用的是线程池,这就没有关系。
我对使用 multiprocessing 包完全陌生。我已经构建了一个基于代理的模型,并希望 运行 并行使用不同参数进行大量模拟。我的模型采用 xml 文件,提取一些参数和 运行 模拟,然后生成两个 pandas 数据帧并将它们保存为 pickle 文件。 我正在尝试使用 multiprocessing.Process() class,但是这两个数据帧没有正确保存,而对于某些模拟,我得到一个数据帧,而其他人没有数据帧。 我是否为此类工作使用了正确的 class?使用多处理模块将模拟结果写入磁盘的最安全方法是什么? 我补充说,如果我用一个简单的循环按顺序启动模拟,我会得到正确的输出。 感谢支持
我添加了一个不可重现的代码示例,因为我无法共享由许多模块和 xml 个文件组成的模型。
import time
import multiprocessing
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = MYMODEL()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents" + ".pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model" + ".pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
processes = []
for path, name in zip(paths, names):
p = multiprocessing.Process(target=run, args=(path, name))
processes.append(p)
p.start()
for process in processes:
process.join()
我可以详细说明我的评论,但是,唉,看看你的代码,对你的模型一无所知,我看不出你提到的问题的明显原因。
我在评论中提到,我会根据您的处理是 I/O 绑定还是 CPU 绑定来使用线程池或处理器池,以便更好地控制 threads/processes 你创造。虽然创建线程的开销较小,但 Python 解释器将在同一进程中执行,因此在执行 Python 字节码时没有并行性,因为必须首先使用全局解释器锁 (GIL)获得。因此,出于这个原因,通常建议将处理器池用于 CPU 密集型作业。然而,当执行发生在 运行time 以 C 语言实现的库中时,例如 numpy
和 pandas
的情况,Python 解释器释放 GIL,你仍然可以和线程有很高的并行度。但我不知道 ProtonOC
class 实例进行了哪些自然处理。一些如果它显然是 I/O 相关。所以现在我会建议您首先尝试一个线程池,我为其任意设置了最大大小 20(我凭空得出的数字)。这里的问题是你正在对你的磁盘进行并发操作,我不知道太多线程是否会减慢磁盘操作(你有一个固态驱动器,手臂移动不是问题吗?)
如果您 运行 下面的代码示例将 MAX_CONCURRENCY
设置为 1,大概它应该可以工作。当然,这不是您的最终目标。但它演示了设置并发性是多么容易。
import time
from concurrent.futures import ThreadPoolExecutor as Executor
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = ProtonOC()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents.pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model.pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
executor.map(run, paths, names)
要使用进程池,更改:
from concurrent.futures import ThreadPoolExecutor as Executor
至:
from concurrent.futures import ProcessPoolExecutor as Executor
然后您可能希望更改 MAX_CONCURRENCY
。但是因为作业仍然涉及很多 I/O 并在执行此操作时放弃处理器 I/O,您可能会受益于此值大于您拥有的 CPU 的数量。
更新
使用 ThreadPoolExecutor
class 的 map
方法的替代方法是使用 submit
。这使您有机会根据个人作业提交处理任何异常:
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
futures = [executor.submit(run, path, name) for path, name in zip(paths, names)]
for future in futures:
try:
result = future.get() # return value from run, which is None
except Exception as e: # any exception run might have thrown
print(e) # handle this as you see fit
您应该知道,这会逐个提交作业,而 map
与 ProcessPoolExecutor
一起使用时,允许您指定 chunksize
参数。如果池大小为 N 和 M 个作业要提交,其中 M 远大于 N,将池中的每个进程一次放入工作队列 chunksize
个作业比一次放置一个作业更有效减少所需共享内存传输次数的时间。但是只要你使用的是线程池,这就没有关系。