在 Nextflow (DSL2) 管道中组合互斥进程的输出

Combine outputs of mutually exclusive processes in a Nextflow (DSL2) pipeline

我在 Nextflow 中设置了一个 DSL2 工作流程,如下所示:


nextflow.enable.dsl=2

// process 1, mutually exclusive with process 2 below
process bcl {

    tag "bcl2fastq"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Stats/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl2fastq/bin:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Stats/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/*', emit: interop_ch
        path 'Run*.xml'
    script: 
        // processing omitted
    }

// Process 2, note the slightly different outputs
process bcl_convert {
tag "bcl-convert"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Reports/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl-convert/:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Reports/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/', emit: interop_ch
        path 'Run*.xml'

    script:
        // processing omitted
}

// downstream process that needs either the first or the second to work, agnostic
process fastqc {
    cpus 12

    publishDir "${params.outdir}/", mode: "copy"

    module 'conda//anaconda3'
    conda '/opt/anaconda3/envs/tools/'

    input:
        path fastq_input
    output:
        path "fastqc", emit: fastqc_output

    script:
    """
    mkdir -p fastqc
    fastqc -t ${task.cpus} $fastq_input -o fastqc
    """

}

现在我有一个变量 params.bcl_convert 可以用来从一个进程切换到另一个进程,我设置了这样的工作流:

workflow {
    runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")

    runfolder = Channel.fromPath(runfolder_repaired, type: 'dir')
    sample_data = Channel.fromPath(params.samplesheet, type: 'file')

    if (!params.bcl_convert) {
       bcl(runfolder, sample_data)
    } else {
        bcl_convert(runfolder, sample_data)
    }

    fastqc(bcl.out.mix(bcl_convert.out)) // Problematic line
}

问题在于有问题的行:我不确定如何(以及是否可能)让 fastqc 获得 bcl2fastqbcl_convert 的输入(但是 fastq_ch,而不是其余的)不管生成它的过程如何。

我尝试过的一些事情包括(受 https://github.com/nextflow-io/nextflow/issues/1646 启发,但那个使用过程的输出):

    if (!params.bcl_convert) {
       def bcl_out = bcl(runfolder, sample_data).out
    } else {
        def bcl_out = bcl_convert(runfolder, sample_data).out
    }

    fastqc(bcl_out.fastq_ch)

但是,即使使用与 post:

类似的方法,编译也会失败 Variable "runfolder" already defined in the process scope
def result_bcl2fastq = !params.bclconvert ? bcl(runfolder, sample_data): Channel.empty()
def result_bclconvert = params.bclconvert ? bcl_convert(runfolder, sample_data): Channel.empty()

我考虑过在单个脚本中使用条件,但是两个进程的输出不同,所以这不太可能。 我让它工作的唯一方法是复制所有输出,例如:

if (!params.bcl_convert) {
   bcl(runfolder, sample_data)
   fastqc(bcl.out.fastqc_ch)
} else {
   bcl_convert(runfolder, sample_data)
   fastqc(bcl_convert.out.fastqc_ch
}

然而,这在我看来是不必要的并发症。我想做的事情真的可行吗?

经过大量的反复试验,我终于弄明白了。

将变量分配给流程输出就像上述流程的 .out 属性 一样。所以我为两个独占进程设置了相同的变量,设置了相同的输出(如问题中所示)然后直接访问它们而不使用 .out:

workflow {

    runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")

    runfolder = Channel.fromPath(
        runfolder_repaired, type: 'dir')

    sample_data = Channel.fromPath(
        params.samplesheet, type: 'file')

    if (!params.bcl_convert) {
       bcl_out = bcl2fastq(runfolder, sample_data)
    } else {
       bcl_out = bcl_convert(runfolder, sample_data)
    }
    fastqc(bcl_out.fastqc_ch)
}