与 python 在多处理中共享字典
Sharing dictionary in multiprocessing with python
在我的程序中,我需要在 Python 的多进程中共享一个字典。我已经简化了代码以在此处放置一个示例:
import multiprocessing
def folding (return_dict, seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
return_dict [seq] = [dis, d, ddg]
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', atcggatcg', agctgctagct']
manager = Manager()
return_dict = manager.dict()
n_cores = 3
for i in range (0, len(seqs), n_cores) #n_cores is the number of cores availables in the computer, defined by the user
subseqs = seqs[i:i + n_cores]
processes = [Process(target=folding, args =(return_dict, seq)) for seq in subseqs]
for p in processes:
p.start()
for p in processes:
p.join()
for i in retun_dict:
print i
我希望在程序结束时得到具有所有 属性 值的 return_dict。
当我 运行 我的程序必须使用包含数千个序列的 seqs 列表并重复多次时,有时我会得到正确的结果,但有时(大部分时间)程序永远不会结束,但返回任何错误,我不知道出了什么问题。
此外,我认为这在时间上不是很有效,我想知道是否有其他方法可以最有效和更快地做到这一点。
谢谢大家!
修复了一些小的语法错误后,您的代码似乎可以正常工作。
但是,我会使用 multiprocessing pool 而不是您的自定义解决方案 运行 总是一次 n_cores
个进程。您的方法的问题是在开始下一批之前需要完成所有过程。根据您需要计算 folding
的时间的可变性,您可能会遇到瓶颈。在最坏的情况下,这意味着与单核处理相比没有任何加速。
此外,您的程序将运行在Windows下陷入严重问题。您需要确保可以安全地导入主模块,而无需重新 运行 处理您的多处理代码。也就是说,您需要通过 if __name__ == '__main___'
保护您的主要 入口点 ,您可能已经在其他 python 脚本中看到过。这将确保您的受保护代码仅在您的脚本作为解释器的 main 文件启动时才执行,即仅执行一次,而不是由您生成的每个新子进程执行。
这是我使用池对您的代码稍作修改的版本:
import multiprocessing as mp
def folding(return_dict, seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
return_dict[seq] = [dis, d, ddg]
def main():
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
manager = mp.Manager()
return_dict = manager.dict()
n_cores = 3
# created pool running maximum 3 cores
pool = mp.Pool(n_cores)
# Execute the folding task in parallel
for seq in seqs:
pool.apply_async(folding, args=(return_dict, seq))
# Tell the pool that there are no more tasks to come and join
pool.close()
pool.join()
# Print the results
for i in return_dict.keys():
print(i, return_dict[i])
if __name__ == '__main__':
# Protected main function
main()
这将打印
atcgtg [1, 0, 1]
atcgatcgatc [1, 0, 1]
agcgatcg [1, 0, 1]
atcggatcg [1, 0, 1]
agctgctagct [1, 0, 1]
没有共享数据的例子
编辑: 同样在您的情况下,实际上不需要共享数据结构。您可以简单地依赖 pool's map 函数。 map 接受一个 iterable ,然后用它调用函数 folding
一次迭代的所有元素。使用 map 而不是 map_asnyc 的优点是结果按输入顺序排列。但是你需要等到所有的结果都收集到才能处理它们。
这里是一个使用 map 的例子。请注意,您的函数 folding
现在 returns 结果而不是将其放入共享字典中:
import multiprocessing as mp
def folding(seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
# Return results instead of using shared data
return [dis, d, ddg]
def main():
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
n_cores = 3
pool = mp.Pool(n_cores)
# Use map which blocks until all results are ready:
res = pool.map(folding, iterable=seqs)
pool.close()
pool.join()
# Print the results
# Using list(zip(..)) to print inputs next to outputs
print(list(zip(seqs, res)))
if __name__ == '__main__':
main()
这一个打印
[('atcgtg', [1, 0, 1]), ('agcgatcg', [1, 0, 1]), ('atcgatcgatc', [1, 0, 1]), ('atcggatcg', [1, 0, 1]), ('agctgctagct', [1, 0, 1])]
在我的程序中,我需要在 Python 的多进程中共享一个字典。我已经简化了代码以在此处放置一个示例:
import multiprocessing
def folding (return_dict, seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
return_dict [seq] = [dis, d, ddg]
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', atcggatcg', agctgctagct']
manager = Manager()
return_dict = manager.dict()
n_cores = 3
for i in range (0, len(seqs), n_cores) #n_cores is the number of cores availables in the computer, defined by the user
subseqs = seqs[i:i + n_cores]
processes = [Process(target=folding, args =(return_dict, seq)) for seq in subseqs]
for p in processes:
p.start()
for p in processes:
p.join()
for i in retun_dict:
print i
我希望在程序结束时得到具有所有 属性 值的 return_dict。 当我 运行 我的程序必须使用包含数千个序列的 seqs 列表并重复多次时,有时我会得到正确的结果,但有时(大部分时间)程序永远不会结束,但返回任何错误,我不知道出了什么问题。 此外,我认为这在时间上不是很有效,我想知道是否有其他方法可以最有效和更快地做到这一点。 谢谢大家!
修复了一些小的语法错误后,您的代码似乎可以正常工作。
但是,我会使用 multiprocessing pool 而不是您的自定义解决方案 运行 总是一次 n_cores
个进程。您的方法的问题是在开始下一批之前需要完成所有过程。根据您需要计算 folding
的时间的可变性,您可能会遇到瓶颈。在最坏的情况下,这意味着与单核处理相比没有任何加速。
此外,您的程序将运行在Windows下陷入严重问题。您需要确保可以安全地导入主模块,而无需重新 运行 处理您的多处理代码。也就是说,您需要通过 if __name__ == '__main___'
保护您的主要 入口点 ,您可能已经在其他 python 脚本中看到过。这将确保您的受保护代码仅在您的脚本作为解释器的 main 文件启动时才执行,即仅执行一次,而不是由您生成的每个新子进程执行。
这是我使用池对您的代码稍作修改的版本:
import multiprocessing as mp
def folding(return_dict, seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
return_dict[seq] = [dis, d, ddg]
def main():
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
manager = mp.Manager()
return_dict = manager.dict()
n_cores = 3
# created pool running maximum 3 cores
pool = mp.Pool(n_cores)
# Execute the folding task in parallel
for seq in seqs:
pool.apply_async(folding, args=(return_dict, seq))
# Tell the pool that there are no more tasks to come and join
pool.close()
pool.join()
# Print the results
for i in return_dict.keys():
print(i, return_dict[i])
if __name__ == '__main__':
# Protected main function
main()
这将打印
atcgtg [1, 0, 1]
atcgatcgatc [1, 0, 1]
agcgatcg [1, 0, 1]
atcggatcg [1, 0, 1]
agctgctagct [1, 0, 1]
没有共享数据的例子
编辑: 同样在您的情况下,实际上不需要共享数据结构。您可以简单地依赖 pool's map 函数。 map 接受一个 iterable ,然后用它调用函数 folding
一次迭代的所有元素。使用 map 而不是 map_asnyc 的优点是结果按输入顺序排列。但是你需要等到所有的结果都收集到才能处理它们。
这里是一个使用 map 的例子。请注意,您的函数 folding
现在 returns 结果而不是将其放入共享字典中:
import multiprocessing as mp
def folding(seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
# Return results instead of using shared data
return [dis, d, ddg]
def main():
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
n_cores = 3
pool = mp.Pool(n_cores)
# Use map which blocks until all results are ready:
res = pool.map(folding, iterable=seqs)
pool.close()
pool.join()
# Print the results
# Using list(zip(..)) to print inputs next to outputs
print(list(zip(seqs, res)))
if __name__ == '__main__':
main()
这一个打印
[('atcgtg', [1, 0, 1]), ('agcgatcg', [1, 0, 1]), ('atcgatcgatc', [1, 0, 1]), ('atcggatcg', [1, 0, 1]), ('agctgctagct', [1, 0, 1])]