Nextflow:我应该如何在 X 行截断 fastq 文件?进程失败并出现错误 141

Nextflow: how should I truncate a fastq file at line X? Process fails with error 141

我正在尝试创建一个进程,其中 gzipped fastq 文件被截断为 900k 行。如果它少于 900k 行,代码将执行但不修改文件。但如果文件超过 900k 行,则新文件应该只有 900k 行。我已经尝试使用 sedhead,将文件的 zcat 命令通过管道传输到其中一个,但是该过程失败并显示 error code: 141,这就是我的原因我发现,与管道有关。

我的代码,使用head和paired end reads如下:

process TRUNCATE_FASTQ {

input:
set val(sample), val(single_end), path(reads) from ch_reads

output:
path foo into ch_output

script:
zcat ${reads[0]} | head -900000 > truncated_file_1
gzip truncated_file_1
mv truncated_file_1.gz truncated_${reads[0]}
zcat ${reads[1]} | head -900000 > truncated_file_2
gzip truncated_file_2
mv truncated_file_2.gz truncated_${reads[1]}

}

我使用 sed 的代码是:(输入和输出相同,只是脚本有所变化)

script:
sed -i -n '1,900000p' ${reads[0]}
sed -i -n '1,900000p' ${reads[1]}

所以我的问题是:我应该如何修改fastq文件并在进程内截断它?我的目标只是在这个过程之后用一个较小的 fastq 文件执行整个管道。

谢谢

所以!

经过一些测试,似乎正如@tripleee 所说,使用 head 时出现错误信号。

我不完全理解这背后的动机,但是this answer在另一个问题中发现,似乎至少解决了我的问题。

所以最终的代码是这样的:

process TRUNCATE_FASTQ {

input:
set val(sample), val(single_end), path(reads) from ch_reads

output:
path foo into ch_output

script:
zcat ${reads[0]}  | awk '(NR<=900000)' > truncated_file_1
gzip truncated_file_1

您可能在 nextflow.config:

中添加了如下内容
process {

  shell = [ '/bin/bash', '-euo', 'pipefail' ]
}

你得到一个错误,因为 已经提前关闭了它的目的地,并且无法(成功地)写入解压缩的文件。它无法知道这是故意的,或者是否发生了不好的事情(例如磁盘不足 space、网络错误等)。

如果您不想更改 shell 选项,您可以改用读取整个输入流的工具。这就是您的 解决方案所做的,与以下内容相同:

zcat test.fastq.gz | awk 'NR<=900000 { print [=11=] }'

对于每条输入记录,AWK 测试当前记录号 (NR) 以查看它是否小于 900000。如果小于,则将整行打印到标准输出。如果不是,请尝试查看下一行是否小于 900000。此解决方案的问题是,如果您的输入 FASTQ 非常大,这将很慢。所以你可能会想使用:

zcat test.fastq.gz | awk 'NR>900000 { exit }'

但这与最初的 解决方案非常相似,并且会在您当前的 shell 选项中产生错误。更好的解决方案(让您保留当前的 ​​shell 选项)是使用进程替换来避免管道:

head -n 900000 < <(zcat test.fastq.gz)

您的 Nextflow 代码可能如下所示:

nextflow.enable.dsl=2


params.max_lines = 900000

process TRUNCATE_FASTQ {

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    head -n ${params.max_lines} < <(zcat "${fq1}") | gzip > "truncated_${fq1}"
    head -n ${params.max_lines} < <(zcat "${fq2}") | gzip > "truncated_${fq2}"
    """
}

workflow {

    TRUNCATE_FASTQ( Channel.fromFilePairs('*.{1,2}.fastq.gz') )
}

如果您有 Python,它具有本机 gzip 支持,您也可以为此使用 pysam。例如:

params.max_records = 225000

process TRUNCATE_FASTQ {

    container 'quay.io/biocontainers/pysam:0.16.0.1--py37hc334e0b_1'

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    #!/usr/bin/env python
    import gzip
    import pysam

    def truncate(fn, num):
        with pysam.FastxFile(fn) as ifh:
            with gzip.open(f'truncated_{fn}', 'wt') as ofh:
                for idx, entry in enumerate(ifh):
                    if idx >= num:
                        break
                    print(str(entry), file=ofh)

    truncate('${fq1}', ${params.max_records})
    truncate('${fq2}', ${params.max_records})
    """
}

或使用Biopython:

params.max_records = 225000

process TRUNCATE_FASTQ {

    container 'quay.io/biocontainers/biopython:1.78'

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    #!/usr/bin/env python
    import gzip
    from itertools import islice
    from Bio import SeqIO

    def xopen(fn, mode='r'):
        if fn.endswith('.gz'):
            return gzip.open(fn, mode) 
        else:
            return open(fn, mode)

    def truncate(fn, num):
        with xopen(fn, 'rt') as ifh, xopen(f'truncated_{fn}', 'wt') as ofh:
            seqs = islice(SeqIO.parse(ifh, 'fastq'), num)
            SeqIO.write(seqs, ofh, 'fastq')

    truncate('${fq1}', ${params.max_records})
    truncate('${fq2}', ${params.max_records})
    """
}