基准通道创建 NextFlow

Benchmark channel creation NextFlow

我正在 NextFlow 上执行分散-聚集操作。

看起来像下面这样:

reads = PATH+"test_1.fq"
outdir = "results"

split_read_ch = channel.fromFilePairs(reads, checkIfExists: true, flat:true ).splitFastq( by: 10, file:"test_split" )

process Scatter_fastP {
    tag 'Scatter_fastP'
    publishDir outdir

    input:
    tuple val(name), path(reads) from split_read_ch
    
    output:
    file "${reads}.trimmed.fastq" into gather_fatsp_ch

    script:
    """
    fastp -i ${reads} -o ${reads}.trimmed.fastq
    """
}


gather_fatsp_ch.collectFile().view().println{ it.text }

我 运行 此代码包含 Nextflow (https://www.nextflow.io/docs/latest/tracing.html) 提出的所有基准选项: nextflow run main.nf -with-report nextflow_report -with-trace nextflow_trace -with-timeline nextflow_timeline -with-dag nextflow_dag.html

在这些跟踪文件中,我可以找到 10 个 Scatter_fastP 进程的资源和速度。 但我还想衡量创建 split_read_chgather_fastp_ch 频道的资源和速度。

我曾尝试将频道的创建包含在流程中,但我找不到使其正常工作的解决方案。 有没有办法将通道创建包含到跟踪文件中?还是我没有找到将这些通道创建到流程中的方法?

预先感谢您的帮助。

虽然 Nextflow 可以解析 FASTQ 文件并将它们拆分成更小的文件等,但通常最好将这些操作传递给另一个进程或一组进程,尤其是当您的输入 FASTQ 文件很大时。这有两个好处:(1) 您的主要 nextflow 流程不需要那么努力地工作,以及 (2) 您可以在 nextflow 报告中获得详细的任务流程统计信息。

以下示例使用 GNU split 拆分输入的 FASTQ 文件,并使用 groupTuple() 运算符和 groupKey() 内置函数收集输出以尽快流式传输收集的值。您需要适应您的非 gzip 压缩输入:

nextflow.enable.dsl=2

params.num_lines = 40000
params.suffix_length = 5

process split_fastq {

    input:
    tuple val(name), path(fastq)

    output:
    tuple val(name), path("${name}-${/[0-9]/*params.suffix_length}.fastq.gz")

    shell:
    '''
    zcat "!{fastq}" | split \
        -a "!{params.suffix_length}" \
        -d \
        -l "!{params.num_lines}" \
        --filter='gzip > ${FILE}.fastq.gz' \
        - \
        "!{name}-"
    '''
}

process fastp {

    input:
    tuple val(name), path(fastq)

    output:
    tuple val(name), path("${fastq.getBaseName(2)}.trimmed.fastq.gz")

    """
    fastp -i "${fastq}" -o "${fastq.getBaseName(2)}.trimmed.fastq.gz"
    """
}

workflow {

    Channel.fromFilePairs( './data/*.fastq.gz', size: 1 ) \
        | split_fastq \
        | map { name, fastq -> tuple( groupKey(name, fastq.size()), fastq ) } \
        | transpose() \
        | fastp \
        | groupTuple() \
        | map { key, fastqs -> tuple( key.toString(), fastqs ) } \
        | view()
}