在 Nextflow (DSL2) 管道中组合互斥进程的输出
Combine outputs of mutually exclusive processes in a Nextflow (DSL2) pipeline
我在 Nextflow 中设置了一个 DSL2 工作流程,如下所示:
nextflow.enable.dsl=2
// process 1, mutually exclusive with process 2 below
process bcl {
tag "bcl2fastq"
publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
publishDir params.outdir, mode: 'copy', pattern: 'fastq/Stats/*'
publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
beforeScript 'export PATH=/opt/tools/bcl2fastq/bin:$PATH'
input:
path runfolder
path samplesheet
output:
path 'fastq/Stats/', emit: bcl_ch
path 'fastq/**fastq.gz', emit: fastqc_ch
path 'InterOp/*', emit: interop_ch
path 'Run*.xml'
script:
// processing omitted
}
// Process 2, note the slightly different outputs
process bcl_convert {
tag "bcl-convert"
publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
publishDir params.outdir, mode: 'copy', pattern: 'fastq/Reports/*'
publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
beforeScript 'export PATH=/opt/tools/bcl-convert/:$PATH'
input:
path runfolder
path samplesheet
output:
path 'fastq/Reports/', emit: bcl_ch
path 'fastq/**fastq.gz', emit: fastqc_ch
path 'InterOp/', emit: interop_ch
path 'Run*.xml'
script:
// processing omitted
}
// downstream process that needs either the first or the second to work, agnostic
process fastqc {
cpus 12
publishDir "${params.outdir}/", mode: "copy"
module 'conda//anaconda3'
conda '/opt/anaconda3/envs/tools/'
input:
path fastq_input
output:
path "fastqc", emit: fastqc_output
script:
"""
mkdir -p fastqc
fastqc -t ${task.cpus} $fastq_input -o fastqc
"""
}
现在我有一个变量 params.bcl_convert
可以用来从一个进程切换到另一个进程,我设置了这样的工作流:
workflow {
runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")
runfolder = Channel.fromPath(runfolder_repaired, type: 'dir')
sample_data = Channel.fromPath(params.samplesheet, type: 'file')
if (!params.bcl_convert) {
bcl(runfolder, sample_data)
} else {
bcl_convert(runfolder, sample_data)
}
fastqc(bcl.out.mix(bcl_convert.out)) // Problematic line
}
问题在于有问题的行:我不确定如何(以及是否可能)让 fastqc
获得 bcl2fastq
或 bcl_convert
的输入(但是仅 fastq_ch
,而不是其余的)不管生成它的过程如何。
我尝试过的一些事情包括(受 https://github.com/nextflow-io/nextflow/issues/1646 启发,但那个使用过程的输出):
if (!params.bcl_convert) {
def bcl_out = bcl(runfolder, sample_data).out
} else {
def bcl_out = bcl_convert(runfolder, sample_data).out
}
fastqc(bcl_out.fastq_ch)
但是,即使使用与 post:
类似的方法,编译也会失败 Variable "runfolder" already defined in the process scope
def result_bcl2fastq = !params.bclconvert ? bcl(runfolder, sample_data): Channel.empty()
def result_bclconvert = params.bclconvert ? bcl_convert(runfolder, sample_data): Channel.empty()
我考虑过在单个脚本中使用条件,但是两个进程的输出不同,所以这不太可能。
我让它工作的唯一方法是复制所有输出,例如:
if (!params.bcl_convert) {
bcl(runfolder, sample_data)
fastqc(bcl.out.fastqc_ch)
} else {
bcl_convert(runfolder, sample_data)
fastqc(bcl_convert.out.fastqc_ch
}
然而,这在我看来是不必要的并发症。我想做的事情真的可行吗?
经过大量的反复试验,我终于弄明白了。
将变量分配给流程输出就像上述流程的 .out
属性 一样。所以我为两个独占进程设置了相同的变量,设置了相同的输出(如问题中所示)然后直接访问它们而不使用 .out
:
workflow {
runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")
runfolder = Channel.fromPath(
runfolder_repaired, type: 'dir')
sample_data = Channel.fromPath(
params.samplesheet, type: 'file')
if (!params.bcl_convert) {
bcl_out = bcl2fastq(runfolder, sample_data)
} else {
bcl_out = bcl_convert(runfolder, sample_data)
}
fastqc(bcl_out.fastqc_ch)
}
我在 Nextflow 中设置了一个 DSL2 工作流程,如下所示:
nextflow.enable.dsl=2
// process 1, mutually exclusive with process 2 below
process bcl {
tag "bcl2fastq"
publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
publishDir params.outdir, mode: 'copy', pattern: 'fastq/Stats/*'
publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
beforeScript 'export PATH=/opt/tools/bcl2fastq/bin:$PATH'
input:
path runfolder
path samplesheet
output:
path 'fastq/Stats/', emit: bcl_ch
path 'fastq/**fastq.gz', emit: fastqc_ch
path 'InterOp/*', emit: interop_ch
path 'Run*.xml'
script:
// processing omitted
}
// Process 2, note the slightly different outputs
process bcl_convert {
tag "bcl-convert"
publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
publishDir params.outdir, mode: 'copy', pattern: 'fastq/Reports/*'
publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
beforeScript 'export PATH=/opt/tools/bcl-convert/:$PATH'
input:
path runfolder
path samplesheet
output:
path 'fastq/Reports/', emit: bcl_ch
path 'fastq/**fastq.gz', emit: fastqc_ch
path 'InterOp/', emit: interop_ch
path 'Run*.xml'
script:
// processing omitted
}
// downstream process that needs either the first or the second to work, agnostic
process fastqc {
cpus 12
publishDir "${params.outdir}/", mode: "copy"
module 'conda//anaconda3'
conda '/opt/anaconda3/envs/tools/'
input:
path fastq_input
output:
path "fastqc", emit: fastqc_output
script:
"""
mkdir -p fastqc
fastqc -t ${task.cpus} $fastq_input -o fastqc
"""
}
现在我有一个变量 params.bcl_convert
可以用来从一个进程切换到另一个进程,我设置了这样的工作流:
workflow {
runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")
runfolder = Channel.fromPath(runfolder_repaired, type: 'dir')
sample_data = Channel.fromPath(params.samplesheet, type: 'file')
if (!params.bcl_convert) {
bcl(runfolder, sample_data)
} else {
bcl_convert(runfolder, sample_data)
}
fastqc(bcl.out.mix(bcl_convert.out)) // Problematic line
}
问题在于有问题的行:我不确定如何(以及是否可能)让 fastqc
获得 bcl2fastq
或 bcl_convert
的输入(但是仅 fastq_ch
,而不是其余的)不管生成它的过程如何。
我尝试过的一些事情包括(受 https://github.com/nextflow-io/nextflow/issues/1646 启发,但那个使用过程的输出):
if (!params.bcl_convert) {
def bcl_out = bcl(runfolder, sample_data).out
} else {
def bcl_out = bcl_convert(runfolder, sample_data).out
}
fastqc(bcl_out.fastq_ch)
但是,即使使用与 post:
类似的方法,编译也会失败Variable "runfolder" already defined in the process scope
def result_bcl2fastq = !params.bclconvert ? bcl(runfolder, sample_data): Channel.empty()
def result_bclconvert = params.bclconvert ? bcl_convert(runfolder, sample_data): Channel.empty()
我考虑过在单个脚本中使用条件,但是两个进程的输出不同,所以这不太可能。 我让它工作的唯一方法是复制所有输出,例如:
if (!params.bcl_convert) {
bcl(runfolder, sample_data)
fastqc(bcl.out.fastqc_ch)
} else {
bcl_convert(runfolder, sample_data)
fastqc(bcl_convert.out.fastqc_ch
}
然而,这在我看来是不必要的并发症。我想做的事情真的可行吗?
经过大量的反复试验,我终于弄明白了。
将变量分配给流程输出就像上述流程的 .out
属性 一样。所以我为两个独占进程设置了相同的变量,设置了相同的输出(如问题中所示)然后直接访问它们而不使用 .out
:
workflow {
runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")
runfolder = Channel.fromPath(
runfolder_repaired, type: 'dir')
sample_data = Channel.fromPath(
params.samplesheet, type: 'file')
if (!params.bcl_convert) {
bcl_out = bcl2fastq(runfolder, sample_data)
} else {
bcl_out = bcl_convert(runfolder, sample_data)
}
fastqc(bcl_out.fastqc_ch)
}