Nextflow:我应该如何在 X 行截断 fastq 文件?进程失败并出现错误 141
Nextflow: how should I truncate a fastq file at line X? Process fails with error 141
我正在尝试创建一个进程,其中 gzipped fastq 文件被截断为 900k 行。如果它少于 900k 行,代码将执行但不修改文件。但如果文件超过 900k 行,则新文件应该只有 900k 行。我已经尝试使用 sed
和 head
,将文件的 zcat
命令通过管道传输到其中一个,但是该过程失败并显示 error code: 141
,这就是我的原因我发现,与管道有关。
我的代码,使用head
和paired end reads如下:
process TRUNCATE_FASTQ {
input:
set val(sample), val(single_end), path(reads) from ch_reads
output:
path foo into ch_output
script:
zcat ${reads[0]} | head -900000 > truncated_file_1
gzip truncated_file_1
mv truncated_file_1.gz truncated_${reads[0]}
zcat ${reads[1]} | head -900000 > truncated_file_2
gzip truncated_file_2
mv truncated_file_2.gz truncated_${reads[1]}
}
我使用 sed
的代码是:(输入和输出相同,只是脚本有所变化)
script:
sed -i -n '1,900000p' ${reads[0]}
sed -i -n '1,900000p' ${reads[1]}
所以我的问题是:我应该如何修改fastq文件并在进程内截断它?我的目标只是在这个过程之后用一个较小的 fastq 文件执行整个管道。
谢谢
所以!
经过一些测试,似乎正如@tripleee 所说,使用 head
时出现错误信号。
我不完全理解这背后的动机,但是this answer在另一个问题中发现,似乎至少解决了我的问题。
所以最终的代码是这样的:
process TRUNCATE_FASTQ {
input:
set val(sample), val(single_end), path(reads) from ch_reads
output:
path foo into ch_output
script:
zcat ${reads[0]} | awk '(NR<=900000)' > truncated_file_1
gzip truncated_file_1
您可能在 nextflow.config
:
中添加了如下内容
process {
shell = [ '/bin/bash', '-euo', 'pipefail' ]
}
你得到一个错误,因为 zcat 已经提前关闭了它的目的地,并且无法(成功地)写入解压缩的文件。它无法知道这是故意的,或者是否发生了不好的事情(例如磁盘不足 space、网络错误等)。
如果您不想更改 shell 选项,您可以改用读取整个输入流的工具。这就是您的 awk 解决方案所做的,与以下内容相同:
zcat test.fastq.gz | awk 'NR<=900000 { print [=11=] }'
对于每条输入记录,AWK 测试当前记录号 (NR) 以查看它是否小于 900000。如果小于,则将整行打印到标准输出。如果不是,请尝试查看下一行是否小于 900000。此解决方案的问题是,如果您的输入 FASTQ 非常大,这将很慢。所以你可能会想使用:
zcat test.fastq.gz | awk 'NR>900000 { exit }'
但这与最初的 head 解决方案非常相似,并且会在您当前的 shell 选项中产生错误。更好的解决方案(让您保留当前的 shell 选项)是使用进程替换来避免管道:
head -n 900000 < <(zcat test.fastq.gz)
您的 Nextflow 代码可能如下所示:
nextflow.enable.dsl=2
params.max_lines = 900000
process TRUNCATE_FASTQ {
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
head -n ${params.max_lines} < <(zcat "${fq1}") | gzip > "truncated_${fq1}"
head -n ${params.max_lines} < <(zcat "${fq2}") | gzip > "truncated_${fq2}"
"""
}
workflow {
TRUNCATE_FASTQ( Channel.fromFilePairs('*.{1,2}.fastq.gz') )
}
如果您有 Python,它具有本机 gzip 支持,您也可以为此使用 pysam。例如:
params.max_records = 225000
process TRUNCATE_FASTQ {
container 'quay.io/biocontainers/pysam:0.16.0.1--py37hc334e0b_1'
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
#!/usr/bin/env python
import gzip
import pysam
def truncate(fn, num):
with pysam.FastxFile(fn) as ifh:
with gzip.open(f'truncated_{fn}', 'wt') as ofh:
for idx, entry in enumerate(ifh):
if idx >= num:
break
print(str(entry), file=ofh)
truncate('${fq1}', ${params.max_records})
truncate('${fq2}', ${params.max_records})
"""
}
或使用Biopython:
params.max_records = 225000
process TRUNCATE_FASTQ {
container 'quay.io/biocontainers/biopython:1.78'
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
#!/usr/bin/env python
import gzip
from itertools import islice
from Bio import SeqIO
def xopen(fn, mode='r'):
if fn.endswith('.gz'):
return gzip.open(fn, mode)
else:
return open(fn, mode)
def truncate(fn, num):
with xopen(fn, 'rt') as ifh, xopen(f'truncated_{fn}', 'wt') as ofh:
seqs = islice(SeqIO.parse(ifh, 'fastq'), num)
SeqIO.write(seqs, ofh, 'fastq')
truncate('${fq1}', ${params.max_records})
truncate('${fq2}', ${params.max_records})
"""
}
我正在尝试创建一个进程,其中 gzipped fastq 文件被截断为 900k 行。如果它少于 900k 行,代码将执行但不修改文件。但如果文件超过 900k 行,则新文件应该只有 900k 行。我已经尝试使用 sed
和 head
,将文件的 zcat
命令通过管道传输到其中一个,但是该过程失败并显示 error code: 141
,这就是我的原因我发现,与管道有关。
我的代码,使用head
和paired end reads如下:
process TRUNCATE_FASTQ {
input:
set val(sample), val(single_end), path(reads) from ch_reads
output:
path foo into ch_output
script:
zcat ${reads[0]} | head -900000 > truncated_file_1
gzip truncated_file_1
mv truncated_file_1.gz truncated_${reads[0]}
zcat ${reads[1]} | head -900000 > truncated_file_2
gzip truncated_file_2
mv truncated_file_2.gz truncated_${reads[1]}
}
我使用 sed
的代码是:(输入和输出相同,只是脚本有所变化)
script:
sed -i -n '1,900000p' ${reads[0]}
sed -i -n '1,900000p' ${reads[1]}
所以我的问题是:我应该如何修改fastq文件并在进程内截断它?我的目标只是在这个过程之后用一个较小的 fastq 文件执行整个管道。
谢谢
所以!
经过一些测试,似乎正如@tripleee 所说,使用 head
时出现错误信号。
我不完全理解这背后的动机,但是this answer在另一个问题中发现,似乎至少解决了我的问题。
所以最终的代码是这样的:
process TRUNCATE_FASTQ {
input:
set val(sample), val(single_end), path(reads) from ch_reads
output:
path foo into ch_output
script:
zcat ${reads[0]} | awk '(NR<=900000)' > truncated_file_1
gzip truncated_file_1
您可能在 nextflow.config
:
process {
shell = [ '/bin/bash', '-euo', 'pipefail' ]
}
你得到一个错误,因为 zcat 已经提前关闭了它的目的地,并且无法(成功地)写入解压缩的文件。它无法知道这是故意的,或者是否发生了不好的事情(例如磁盘不足 space、网络错误等)。
如果您不想更改 shell 选项,您可以改用读取整个输入流的工具。这就是您的 awk 解决方案所做的,与以下内容相同:
zcat test.fastq.gz | awk 'NR<=900000 { print [=11=] }'
对于每条输入记录,AWK 测试当前记录号 (NR) 以查看它是否小于 900000。如果小于,则将整行打印到标准输出。如果不是,请尝试查看下一行是否小于 900000。此解决方案的问题是,如果您的输入 FASTQ 非常大,这将很慢。所以你可能会想使用:
zcat test.fastq.gz | awk 'NR>900000 { exit }'
但这与最初的 head 解决方案非常相似,并且会在您当前的 shell 选项中产生错误。更好的解决方案(让您保留当前的 shell 选项)是使用进程替换来避免管道:
head -n 900000 < <(zcat test.fastq.gz)
您的 Nextflow 代码可能如下所示:
nextflow.enable.dsl=2
params.max_lines = 900000
process TRUNCATE_FASTQ {
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
head -n ${params.max_lines} < <(zcat "${fq1}") | gzip > "truncated_${fq1}"
head -n ${params.max_lines} < <(zcat "${fq2}") | gzip > "truncated_${fq2}"
"""
}
workflow {
TRUNCATE_FASTQ( Channel.fromFilePairs('*.{1,2}.fastq.gz') )
}
如果您有 Python,它具有本机 gzip 支持,您也可以为此使用 pysam。例如:
params.max_records = 225000
process TRUNCATE_FASTQ {
container 'quay.io/biocontainers/pysam:0.16.0.1--py37hc334e0b_1'
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
#!/usr/bin/env python
import gzip
import pysam
def truncate(fn, num):
with pysam.FastxFile(fn) as ifh:
with gzip.open(f'truncated_{fn}', 'wt') as ofh:
for idx, entry in enumerate(ifh):
if idx >= num:
break
print(str(entry), file=ofh)
truncate('${fq1}', ${params.max_records})
truncate('${fq2}', ${params.max_records})
"""
}
或使用Biopython:
params.max_records = 225000
process TRUNCATE_FASTQ {
container 'quay.io/biocontainers/biopython:1.78'
input:
tuple val(sample), path(reads)
script:
def (fq1, fq2) = reads
"""
#!/usr/bin/env python
import gzip
from itertools import islice
from Bio import SeqIO
def xopen(fn, mode='r'):
if fn.endswith('.gz'):
return gzip.open(fn, mode)
else:
return open(fn, mode)
def truncate(fn, num):
with xopen(fn, 'rt') as ifh, xopen(f'truncated_{fn}', 'wt') as ofh:
seqs = islice(SeqIO.parse(ifh, 'fastq'), num)
SeqIO.write(seqs, ofh, 'fastq')
truncate('${fq1}', ${params.max_records})
truncate('${fq2}', ${params.max_records})
"""
}