使用 python 包装器并行化 python 脚本
Parallelizing python script with a python wrapper
我有一个 python 脚本 heavy_lifting.py
,我使用从 bash 包装脚本 wrapper.sh
调用的 GNU Parallel 进行了并行化。我用它来处理 fastq 格式的文件,请参见下面的 example.fastq
。虽然这可行,但要求使用两个解释器和依赖集是不雅的。我想使用 python 重写 bash 包装器脚本,同时实现相同的并行化。
example.fastq
这是一个需要处理的输入文件的例子。此输入文件通常很长(~500,000,000)行。
@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//
下面是我开始使用的脚本的最小可重现示例。
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()
# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
with open(args.outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
wrapper.sh
#!/bin/bash
NUMCORES="4"
FASTQ_F="./fastq_F.fastq"
# split the input fastq for parallel processing. One split fastq file will be created for each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_
# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print }' | parallel "python heavy_lifting.py -i split_fastq_F{} -o output.fastq"
#remove intermediate split fastq files
rm split_fastq_*
要执行这些脚本,我使用命令 bash wrapper.sh
。可以看到创建了一个结果文件output.fastq
,里面包含修改后的fastq文件
下面是我尝试使用 python 包装器 wrapper.py
.
调用并行处理
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
counter += 1
elif counter <= split_size:
outfile.write(line.strip())
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)
已编辑以提高 wrapper.py
的可重复性
我似乎最困惑的是如何将输入参数正确地提供给 python wrapper.py 脚本中的“并行”调用。非常感谢任何帮助!
Parallel
需要函数的名称,而不是 file/module 名称
所以在 heavy_lifting
中你必须将代码放入函数中(使用参数而不是 args
)
def my_function(inputFastq, outputFastq):
with open(inputFastq, "r") as infile:
with open(outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
然后就可以使用
Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)
为了可重复性,我将 furas 提供的答案实施到 heavy_lifting.py
和 wrapper.py
脚本中。需要额外的编辑来制作代码 运行 这就是我提供以下内容的原因。
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()
def heavy_lifting_fun(inputFastq, outputFastq):
# Iterate through input file and append to output file
outfile = open(outputFastq, "a")
with open(inputFastq, "r") as infile:
for line in infile:
outfile.write("modified" + line.strip() + "\n")
outfile.close()
if __name__ == '__main__':
heavy_lifting_fun()
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
while (split_size % 4 != 0):
split_size += 1
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
print(counter)
#if counter == 0 and line[0] != "@":
# continue
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
elif counter < split_size:
outfile.write(str(line.strip() + "\n"))
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)
这应该是评论,因为它没有回答问题,但太大了。
全部wrapper.sh
可以写成:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python heavy_lifting.py -i {} -o output.fastq"
如果heavy_lifting.py
只读取文件而不查找,这应该也可以,并且需要更少的磁盘I/O(临时文件被替换为fifo):
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python heavy_lifting.py -i {} -o output.fastq"
它将自动检测 CPU 线程的数量,将 fastq 文件拆分为以 @SRR 开头的行,动态地将其拆分为每个 CPU 线程的一个块并提供至 python.
如果 heavy_lifting.py
在没有给出 -i
的情况下从标准输入读取,那么这也应该有效:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"
如果heavy_lifting.py
没有将唯一的字符串附加到output.fastq
,它将被覆盖。所以最好让 GNU Parallel 给它一个唯一的名字,比如 output2.fastq
:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"
有关更通用的 FASTQ 并行包装器,请参阅:
我有一个 python 脚本 heavy_lifting.py
,我使用从 bash 包装脚本 wrapper.sh
调用的 GNU Parallel 进行了并行化。我用它来处理 fastq 格式的文件,请参见下面的 example.fastq
。虽然这可行,但要求使用两个解释器和依赖集是不雅的。我想使用 python 重写 bash 包装器脚本,同时实现相同的并行化。
example.fastq
这是一个需要处理的输入文件的例子。此输入文件通常很长(~500,000,000)行。
@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//
下面是我开始使用的脚本的最小可重现示例。
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()
# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
with open(args.outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
wrapper.sh
#!/bin/bash
NUMCORES="4"
FASTQ_F="./fastq_F.fastq"
# split the input fastq for parallel processing. One split fastq file will be created for each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_
# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print }' | parallel "python heavy_lifting.py -i split_fastq_F{} -o output.fastq"
#remove intermediate split fastq files
rm split_fastq_*
要执行这些脚本,我使用命令 bash wrapper.sh
。可以看到创建了一个结果文件output.fastq
,里面包含修改后的fastq文件
下面是我尝试使用 python 包装器 wrapper.py
.
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
counter += 1
elif counter <= split_size:
outfile.write(line.strip())
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)
已编辑以提高 wrapper.py
的可重复性我似乎最困惑的是如何将输入参数正确地提供给 python wrapper.py 脚本中的“并行”调用。非常感谢任何帮助!
Parallel
需要函数的名称,而不是 file/module 名称
所以在 heavy_lifting
中你必须将代码放入函数中(使用参数而不是 args
)
def my_function(inputFastq, outputFastq):
with open(inputFastq, "r") as infile:
with open(outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
然后就可以使用
Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)
为了可重复性,我将 furas 提供的答案实施到 heavy_lifting.py
和 wrapper.py
脚本中。需要额外的编辑来制作代码 运行 这就是我提供以下内容的原因。
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()
def heavy_lifting_fun(inputFastq, outputFastq):
# Iterate through input file and append to output file
outfile = open(outputFastq, "a")
with open(inputFastq, "r") as infile:
for line in infile:
outfile.write("modified" + line.strip() + "\n")
outfile.close()
if __name__ == '__main__':
heavy_lifting_fun()
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
while (split_size % 4 != 0):
split_size += 1
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
print(counter)
#if counter == 0 and line[0] != "@":
# continue
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
elif counter < split_size:
outfile.write(str(line.strip() + "\n"))
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)
这应该是评论,因为它没有回答问题,但太大了。
全部wrapper.sh
可以写成:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python heavy_lifting.py -i {} -o output.fastq"
如果heavy_lifting.py
只读取文件而不查找,这应该也可以,并且需要更少的磁盘I/O(临时文件被替换为fifo):
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python heavy_lifting.py -i {} -o output.fastq"
它将自动检测 CPU 线程的数量,将 fastq 文件拆分为以 @SRR 开头的行,动态地将其拆分为每个 CPU 线程的一个块并提供至 python.
如果 heavy_lifting.py
在没有给出 -i
的情况下从标准输入读取,那么这也应该有效:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"
如果heavy_lifting.py
没有将唯一的字符串附加到output.fastq
,它将被覆盖。所以最好让 GNU Parallel 给它一个唯一的名字,比如 output2.fastq
:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"
有关更通用的 FASTQ 并行包装器,请参阅: