多进程,多个进程读取同一个文件
Multiprocess, various process reading the same file
我正在尝试模拟一些 dna 测序读取,并且为了加快代码速度,我需要并行 运行 它。
基本上,我想做的是 following:I am 从人类基因组中读取样本,我认为来自多处理模块的两个进程之一试图从同一个文件(人类基因组)过程被破坏,无法获得所需的 DNA 序列。我尝试过不同的东西,但我对并行编程很陌生,我无法解决我的问题
当我 运行 一个核心的脚本时它工作正常。
这是我调用函数的方式
if __name__ == '__main__':
jobs = []
# init the processes
for i in range(number_of_cores):
length= 100
lock = mp.Manager().Lock()
p = mp.Process(target=simulations.sim_reads,args=(lock,FastaFile, "/home/inigo/msc_thesis/genome_data/hg38.fa",length,paired,results_dir,spawn_reads[i],temp_file_names[i]))
jobs.append(p)
p.start()
for p in jobs:
p.join()
这是我用来读取数据的函数,每个进程都将数据写入不同的文件。
def sim_single_end(lc,fastafile,chr,chr_pos_start,chr_pos_end,read_length, unique_id):
lc.acquire()
left_split_read = fastafile.fetch(chr, chr_pos_end - (read_length / 2), chr_pos_end)
right_split_read = fastafile.fetch(chr, chr_pos_start, chr_pos_start + (read_length / 2))
reversed_left_split_read = left_split_read[::-1]
total_read = reversed_left_split_read + right_split_read
seq_id = "id:%s-%s|left_pos:%s-%s|right:%s-%s " % (unique_id,chr, int(chr_pos_end - (read_length / 2)), int(chr_pos_end), int(chr_pos_start),int(chr_pos_start + (read_length / 2)))
quality = "I" * read_length
fastq_string = "@%s\n%s\n+\n%s\n" % (seq_id, total_read, quality)
lc.release()
new_record = SeqIO.read(StringIO(fastq_string), "fastq")
return(new_record)
这是回溯:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 107, in sim_ecc_reads
new_read = sim_single_end(lc,fastafile, chr, chr_pos_start, chr_pos_end, read_length, read_id)
File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 132, in sim_single_end
new_record = SeqIO.read(StringIO(fastq_string), "fastq")
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 664, in read
first = next(iterator)
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 600, in parse
for r in i:
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 1031, in FastqPhredIterator
for title_line, seq_string, quality_string in FastqGeneralIterator(handle):
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 951, in FastqGeneralIterator
% (title_line, seq_len, len(quality_string)))
ValueError: Lengths of sequence and quality values differs for id:6-chr1_KI270707v1_random|left_pos:50511537-50511587|right:50511214-50511264 (0 and 100).
我是将近一年前做的这个答案的 OP。问题是我用来读取人类基因组文件 (pysam) 的包失败了。问题是调用 multiprocessing 时的拼写错误。
根据作者的回复,这应该有效:
p = mp.Process(target=get_fasta, args=(genome_fa,))
注意“,”以确保传递元组
有关详细信息,请参阅 https://github.com/pysam-developers/pysam/issues/409
我正在尝试模拟一些 dna 测序读取,并且为了加快代码速度,我需要并行 运行 它。
基本上,我想做的是 following:I am 从人类基因组中读取样本,我认为来自多处理模块的两个进程之一试图从同一个文件(人类基因组)过程被破坏,无法获得所需的 DNA 序列。我尝试过不同的东西,但我对并行编程很陌生,我无法解决我的问题
当我 运行 一个核心的脚本时它工作正常。
这是我调用函数的方式
if __name__ == '__main__':
jobs = []
# init the processes
for i in range(number_of_cores):
length= 100
lock = mp.Manager().Lock()
p = mp.Process(target=simulations.sim_reads,args=(lock,FastaFile, "/home/inigo/msc_thesis/genome_data/hg38.fa",length,paired,results_dir,spawn_reads[i],temp_file_names[i]))
jobs.append(p)
p.start()
for p in jobs:
p.join()
这是我用来读取数据的函数,每个进程都将数据写入不同的文件。
def sim_single_end(lc,fastafile,chr,chr_pos_start,chr_pos_end,read_length, unique_id):
lc.acquire()
left_split_read = fastafile.fetch(chr, chr_pos_end - (read_length / 2), chr_pos_end)
right_split_read = fastafile.fetch(chr, chr_pos_start, chr_pos_start + (read_length / 2))
reversed_left_split_read = left_split_read[::-1]
total_read = reversed_left_split_read + right_split_read
seq_id = "id:%s-%s|left_pos:%s-%s|right:%s-%s " % (unique_id,chr, int(chr_pos_end - (read_length / 2)), int(chr_pos_end), int(chr_pos_start),int(chr_pos_start + (read_length / 2)))
quality = "I" * read_length
fastq_string = "@%s\n%s\n+\n%s\n" % (seq_id, total_read, quality)
lc.release()
new_record = SeqIO.read(StringIO(fastq_string), "fastq")
return(new_record)
这是回溯:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 107, in sim_ecc_reads
new_read = sim_single_end(lc,fastafile, chr, chr_pos_start, chr_pos_end, read_length, read_id)
File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 132, in sim_single_end
new_record = SeqIO.read(StringIO(fastq_string), "fastq")
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 664, in read
first = next(iterator)
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 600, in parse
for r in i:
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 1031, in FastqPhredIterator
for title_line, seq_string, quality_string in FastqGeneralIterator(handle):
File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 951, in FastqGeneralIterator
% (title_line, seq_len, len(quality_string)))
ValueError: Lengths of sequence and quality values differs for id:6-chr1_KI270707v1_random|left_pos:50511537-50511587|right:50511214-50511264 (0 and 100).
我是将近一年前做的这个答案的 OP。问题是我用来读取人类基因组文件 (pysam) 的包失败了。问题是调用 multiprocessing 时的拼写错误。
根据作者的回复,这应该有效:
p = mp.Process(target=get_fasta, args=(genome_fa,))
注意“,”以确保传递元组
有关详细信息,请参阅 https://github.com/pysam-developers/pysam/issues/409