在池多处理中写入文件(Python 2.7)
Writing to file in Pool multiprocessing (Python 2.7)
我正在做大量计算并将结果写入文件。使用多处理我正在尝试并行化计算。
这里的问题是我正在写入一个输出文件,所有工作人员也在写入该文件。我对多处理很陌生,想知道如何让它工作。
下面给出一个非常简单的代码概念:
from multiprocessing import Pool
fout_=open('test'+'.txt','w')
def f(x):
fout_.write(str(x) + "\n")
if __name__ == '__main__':
p = Pool(5)
p.map(f, [1, 2, 3])
我想要的结果是一个包含以下内容的文件:
1 2 3
但是现在我得到一个空文件。有什么建议么?
我非常感谢任何帮助:)!
Multiprocessing.pool 产生进程,在没有每个进程锁定的情况下写入公共文件会导致数据丢失。
正如您所说,您正在尝试并行化计算,multiprocessing.pool 可用于并行化计算。
下面是并行计算并将结果写入文件的解决方案,希望对您有所帮助:
from multiprocessing import Pool
# library for time
import datetime
# file in which you want to write
fout = open('test.txt', 'wb')
# function for your calculations, i have tried it to make time consuming
def calc(x):
x = x**2
sum = 0
for i in range(0, 1000000):
sum += i
return x
# function to write in txt file, it takes list of item to write
def f(res):
global fout
for x in res:
fout.write(str(x) + "\n")
if __name__ == '__main__':
qs = datetime.datetime.now()
arr = [1, 2, 3, 4, 5, 6, 7]
p = Pool(5)
res = p.map(calc, arr)
# write the calculated list in file
f(res)
qe = datetime.datetime.now()
print (qe-qs).total_seconds()*1000
# to compare the improvement using multiprocessing, iterative solution
qs = datetime.datetime.now()
for item in arr:
x = calc(item)
fout.write(str(x)+"\n")
qe = datetime.datetime.now()
print (qe-qs).total_seconds()*1000
您不应该让所有 workers/processes 写入一个文件。他们都可以从一个文件中读取(这可能会由于工作人员等待其中一个文件完成读取而导致速度变慢),但是写入同一个文件会导致冲突和潜在的损坏。
如评论中所述,改为写入单独的文件,然后在单个进程中将它们合并为一个文件。这个小程序是根据你post:
中的程序来说明的
from multiprocessing import Pool
def f(args):
''' Perform computation and write
to separate file for each '''
x = args[0]
fname = args[1]
with open(fname, 'w') as fout:
fout.write(str(x) + "\n")
def fcombine(orig, dest):
''' Combine files with names in
orig into one file named dest '''
with open(dest, 'w') as fout:
for o in orig:
with open(o, 'r') as fin:
for line in fin:
fout.write(line)
if __name__ == '__main__':
# Each sublist is a combination
# of arguments - number and temporary output
# file name
x = range(1,4)
names = ['temp_' + str(y) + '.txt' for y in x]
args = list(zip(x,names))
p = Pool(3)
p.map(f, args)
p.close()
p.join()
fcombine(names, 'final.txt')
它为每个参数组合运行 f
,在本例中是 x 的值和临时文件名。它使用一个嵌套的参数组合列表,因为 pool.map
不接受多个参数。还有其他方法可以解决这个问题,尤其是在较新的 Python 版本上。
对于每个参数组合和池成员,它都会创建一个单独的文件,并将输出写入其中。原则上你的输出会更长,你可以简单地添加另一个计算它的函数到 f
函数。此外,无需将 Pool(5) 用于 3 个参数(尽管我假设无论如何只有三个工作人员处于活动状态)。
调用 close()
和 join()
的原因在 this post 中有很好的解释。事实证明(在对链接 post 的评论中)map
正在阻塞,所以在这里你不需要它们是出于最初的原因(等到它们全部完成然后写入组合输出来自一个进程的文件)。如果以后添加其他并行功能,我仍然会使用它们。
在最后一步中,fcombine
收集所有临时文件并将其复制到一个文件中。它有点太嵌套了,例如,如果您决定在复制后删除临时文件,您可能需要在 with open('dest', )..
或下面的 for 循环下使用单独的函数 - 以提高可读性和功能性。
我正在做大量计算并将结果写入文件。使用多处理我正在尝试并行化计算。
这里的问题是我正在写入一个输出文件,所有工作人员也在写入该文件。我对多处理很陌生,想知道如何让它工作。
下面给出一个非常简单的代码概念:
from multiprocessing import Pool
fout_=open('test'+'.txt','w')
def f(x):
fout_.write(str(x) + "\n")
if __name__ == '__main__':
p = Pool(5)
p.map(f, [1, 2, 3])
我想要的结果是一个包含以下内容的文件:
1 2 3
但是现在我得到一个空文件。有什么建议么? 我非常感谢任何帮助:)!
Multiprocessing.pool 产生进程,在没有每个进程锁定的情况下写入公共文件会导致数据丢失。 正如您所说,您正在尝试并行化计算,multiprocessing.pool 可用于并行化计算。
下面是并行计算并将结果写入文件的解决方案,希望对您有所帮助:
from multiprocessing import Pool
# library for time
import datetime
# file in which you want to write
fout = open('test.txt', 'wb')
# function for your calculations, i have tried it to make time consuming
def calc(x):
x = x**2
sum = 0
for i in range(0, 1000000):
sum += i
return x
# function to write in txt file, it takes list of item to write
def f(res):
global fout
for x in res:
fout.write(str(x) + "\n")
if __name__ == '__main__':
qs = datetime.datetime.now()
arr = [1, 2, 3, 4, 5, 6, 7]
p = Pool(5)
res = p.map(calc, arr)
# write the calculated list in file
f(res)
qe = datetime.datetime.now()
print (qe-qs).total_seconds()*1000
# to compare the improvement using multiprocessing, iterative solution
qs = datetime.datetime.now()
for item in arr:
x = calc(item)
fout.write(str(x)+"\n")
qe = datetime.datetime.now()
print (qe-qs).total_seconds()*1000
您不应该让所有 workers/processes 写入一个文件。他们都可以从一个文件中读取(这可能会由于工作人员等待其中一个文件完成读取而导致速度变慢),但是写入同一个文件会导致冲突和潜在的损坏。
如评论中所述,改为写入单独的文件,然后在单个进程中将它们合并为一个文件。这个小程序是根据你post:
中的程序来说明的from multiprocessing import Pool
def f(args):
''' Perform computation and write
to separate file for each '''
x = args[0]
fname = args[1]
with open(fname, 'w') as fout:
fout.write(str(x) + "\n")
def fcombine(orig, dest):
''' Combine files with names in
orig into one file named dest '''
with open(dest, 'w') as fout:
for o in orig:
with open(o, 'r') as fin:
for line in fin:
fout.write(line)
if __name__ == '__main__':
# Each sublist is a combination
# of arguments - number and temporary output
# file name
x = range(1,4)
names = ['temp_' + str(y) + '.txt' for y in x]
args = list(zip(x,names))
p = Pool(3)
p.map(f, args)
p.close()
p.join()
fcombine(names, 'final.txt')
它为每个参数组合运行 f
,在本例中是 x 的值和临时文件名。它使用一个嵌套的参数组合列表,因为 pool.map
不接受多个参数。还有其他方法可以解决这个问题,尤其是在较新的 Python 版本上。
对于每个参数组合和池成员,它都会创建一个单独的文件,并将输出写入其中。原则上你的输出会更长,你可以简单地添加另一个计算它的函数到 f
函数。此外,无需将 Pool(5) 用于 3 个参数(尽管我假设无论如何只有三个工作人员处于活动状态)。
调用 close()
和 join()
的原因在 this post 中有很好的解释。事实证明(在对链接 post 的评论中)map
正在阻塞,所以在这里你不需要它们是出于最初的原因(等到它们全部完成然后写入组合输出来自一个进程的文件)。如果以后添加其他并行功能,我仍然会使用它们。
在最后一步中,fcombine
收集所有临时文件并将其复制到一个文件中。它有点太嵌套了,例如,如果您决定在复制后删除临时文件,您可能需要在 with open('dest', )..
或下面的 for 循环下使用单独的函数 - 以提高可读性和功能性。