多处理池读取文件中的行
Multiprocessing pool to read lines in a file
我试图一次读取一个文件的多行,以便将这些行分成两个单独的列表。 cleanLine 函数本质上接受输入的行并对其进行清理,返回没有空格的行。现在我的代码编译 returns 与没有多处理的结果相同,但是,脚本的总体 运行 时间没有改善所以我不确定它是否真的同时产生多个进程或者是否它只是一次做一个。在这种特定情况下,我不太确定如何判断它实际上是在创建多个进程还是只创建一个。有什么原因导致脚本的这一部分没有 运行 更快,还是我做错了?任何帮助或反馈将不胜感激。
代码片段:
import multiprocessing
from multiprocessing import Pool
filediff = open("sample.txt", "r", encoding ="latin-1")
filediffline = filediff.readlines()
pos = []
neg = []
cpuCores = multiprocessing.cpu_count() - 1
pool = Pool(processes = cpuCores)
for line in filediffline:
result = pool.apply_async(cleanLine, [line]).get()
if line.startswith("+"):
pos.append(result)
elif line.startswith("-"):
neg.append(result)
pool.close()
pool.join()
使用apply_async().get()
等同于阻塞调用apply()
。对于异步处理,尝试利用 apply_async
和回调参数来处理结果。请记住,回调是在单独的线程中调用的。
如前所述,result = pool.apply_async(cleanLine, [line]).get()
向子进程发送单行并等待它到 return。这比仅在父进程中进行处理要慢。即使您重做那一点,也不太可能加快速度,除非预处理是 CPU 密集的。
另一种方法是构建管道,方法是将预处理放入单独的文件中并使用 subprocess.Popen
或使用 multiprocessing.Pipe
执行。使用这种方法,文件读取和行处理都在单独的进程中完成。
这样做的好处是文件读取+预处理与主进程的工作重叠。但是,如果与序列化对象以将其从一个进程转移到另一个进程的成本相比,这种预处理是微不足道的,那么您将看不到任何加速。
import multiprocessing as mp
pos = []
neg = []
def line_cleaner(line):
return line.strip()
def cleaner(filename, encoding, pipe):
try:
with open(filename, encoding=encoding) as fp:
for line in fp:
line = line_cleaner(line)
if line:
pipe.send(line)
finally:
pipe.close()
if __name__ == "__main__":
receiver, sender = mp.Pipe(duplex=False)
process = mp.Process(target=cleaner,
args=("sample.txt", "latin-1", sender))
process.start()
sender.close() # so child holds only reference
try:
while True:
line = receiver.recv()
if line.startswith("+"):
pos.append(line)
elif line.startswith("-"):
neg.append(line)
except EOFError:
pass # child exit
finally:
process.join()
print(pos, neg)
您正在与 IO
合作。我不确定你的处理是 CPU-bound
还是 IO-Bound
operation/process。如前所述,如果你读到 list
的整行,那就意味着你读到的所有 IO
都在 RAM 中(在这种情况下考虑使用 file.read()!这如果您的数据或文件太大,会有副作用),并且在列表上完成所有这些数据的处理,然后您会看到性能有所提升(取决于列表大小),只有在这种情况下,您有足够大的在 ram 上列出,我建议使用 concurent.futures
模块,见下文:
import concurrent.futures
def process_line(line):
return line.strip()
def execute(filename):
lines = []
with open(filename, encoding=encoding) as fp:
lines = fp.read()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(process_line(line)) for line in lines]
我试图一次读取一个文件的多行,以便将这些行分成两个单独的列表。 cleanLine 函数本质上接受输入的行并对其进行清理,返回没有空格的行。现在我的代码编译 returns 与没有多处理的结果相同,但是,脚本的总体 运行 时间没有改善所以我不确定它是否真的同时产生多个进程或者是否它只是一次做一个。在这种特定情况下,我不太确定如何判断它实际上是在创建多个进程还是只创建一个。有什么原因导致脚本的这一部分没有 运行 更快,还是我做错了?任何帮助或反馈将不胜感激。
代码片段:
import multiprocessing
from multiprocessing import Pool
filediff = open("sample.txt", "r", encoding ="latin-1")
filediffline = filediff.readlines()
pos = []
neg = []
cpuCores = multiprocessing.cpu_count() - 1
pool = Pool(processes = cpuCores)
for line in filediffline:
result = pool.apply_async(cleanLine, [line]).get()
if line.startswith("+"):
pos.append(result)
elif line.startswith("-"):
neg.append(result)
pool.close()
pool.join()
使用apply_async().get()
等同于阻塞调用apply()
。对于异步处理,尝试利用 apply_async
和回调参数来处理结果。请记住,回调是在单独的线程中调用的。
如前所述,result = pool.apply_async(cleanLine, [line]).get()
向子进程发送单行并等待它到 return。这比仅在父进程中进行处理要慢。即使您重做那一点,也不太可能加快速度,除非预处理是 CPU 密集的。
另一种方法是构建管道,方法是将预处理放入单独的文件中并使用 subprocess.Popen
或使用 multiprocessing.Pipe
执行。使用这种方法,文件读取和行处理都在单独的进程中完成。
这样做的好处是文件读取+预处理与主进程的工作重叠。但是,如果与序列化对象以将其从一个进程转移到另一个进程的成本相比,这种预处理是微不足道的,那么您将看不到任何加速。
import multiprocessing as mp
pos = []
neg = []
def line_cleaner(line):
return line.strip()
def cleaner(filename, encoding, pipe):
try:
with open(filename, encoding=encoding) as fp:
for line in fp:
line = line_cleaner(line)
if line:
pipe.send(line)
finally:
pipe.close()
if __name__ == "__main__":
receiver, sender = mp.Pipe(duplex=False)
process = mp.Process(target=cleaner,
args=("sample.txt", "latin-1", sender))
process.start()
sender.close() # so child holds only reference
try:
while True:
line = receiver.recv()
if line.startswith("+"):
pos.append(line)
elif line.startswith("-"):
neg.append(line)
except EOFError:
pass # child exit
finally:
process.join()
print(pos, neg)
您正在与 IO
合作。我不确定你的处理是 CPU-bound
还是 IO-Bound
operation/process。如前所述,如果你读到 list
的整行,那就意味着你读到的所有 IO
都在 RAM 中(在这种情况下考虑使用 file.read()!这如果您的数据或文件太大,会有副作用),并且在列表上完成所有这些数据的处理,然后您会看到性能有所提升(取决于列表大小),只有在这种情况下,您有足够大的在 ram 上列出,我建议使用 concurent.futures
模块,见下文:
import concurrent.futures
def process_line(line):
return line.strip()
def execute(filename):
lines = []
with open(filename, encoding=encoding) as fp:
lines = fp.read()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(process_line(line)) for line in lines]