Nextflow:并非流程使用的通道中的所有项目
Nextflow: Not all items in channel used by process
我一直在努力确定为什么 nextflow (v20.10.00) 进程没有使用通道中的所有项目。我希望每个样本 bam 文件(总共 10 个)和每个染色体(总共 3 个)的过程 运行。
下面是频道的创建和过程:
ref_genome = file( params.RefGen, checkIfExists: true )
ref_dir = ref_genome.getParent()
ref_name = ref_genome.getBaseName()
ref_dict = file( "${ref_dir}/${ref_name}.dict", checkIfExists: true )
ref_index = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )
// Handles reading in data if the previous step is skipped
if( params.Skip_BP ){
Channel
.fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.ifEmpty { error "No bams found in ${params.ProcBamDir}" }
.map { ID, files -> tuple(ID, files[0], files[1]) }
.set { processed_bams }
}
// Setting up the chromosome channel
if( params.Chroms == "" ){
// Defaulting to using all chromosomes
chromosomes_ch = Channel
.from("AgamP4_2L", "AgamP4_2R", "AgamP4_3L", "AgamP4_3R", "AgamP4_X", "AgamP4_Y_unplaced", "AgamP4_UNKN")
println "No chromosomes specified, using all major chromosomes: AgamP4_2L, AgamP4_2R, AgamP4_3L, AgamP4_3R, AgamP4_X, AgamP4_Y_unplaced, AgamP4_UNKN"
} else {
// User option to choose which chromosome will be used
// This worked with the following syntax nextflow run testing.nf --profile imperial --Chroms "AgamP4_3R,AgamP4_2L"
chrs = params.Chroms.split(",")
chromosomes_ch = Channel
.from( chrs )
println "User defined chromosomes set: ${params.Chroms}"
}
process DNA_HCG {
errorStrategy { sleep(Math.pow(2, task.attempt) * 600 as long); return 'retry' }
maxRetries 3
maxForks params.HCG_Forks
tag { SampleID+"-"+chrom }
executor = 'pbspro'
clusterOptions = "-lselect=1:ncpus=${params.HCG_threads}:mem=${params.HCG_memory}gb:mpiprocs=1:ompthreads=${params.HCG_threads} -lwalltime=${params.HCG_walltime}:00:00"
publishDir(
path: "${params.HCDir}",
mode: 'copy',
)
input:
each chrom from chromosomes_ch
set SampleID, path(bam), path(bai) from processed_bams
path ref_genome
path ref_dict
path ref_index
output:
tuple chrom, path("${SampleID}-${chrom}.vcf") into HCG_ch
path("${SampleID}-${chrom}.vcf.idx") into idx_ch
beforeScript 'module load anaconda3/personal; source activate NF_GATK'
script:
"""
if [ ! -d tmp ]; then mkdir tmp; fi
taskset -c 0-${params.HCG_threads} gatk --java-options \"-Xmx${params.HCG_memory}G -XX:+UseParallelGC -XX:ParallelGCThreads=${params.HCG_threads}\" HaplotypeCaller \
--tmp-dir tmp/ \
--pair-hmm-implementation AVX_LOGLESS_CACHING_OMP \
--native-pair-hmm-threads ${params.HCG_threads} \
-ERC GVCF \
-L ${chrom} \
-R ${ref_genome} \
-I ${bam} \
-O ${SampleID}-${chrom}.vcf ${params.GVCF_args}
"""
}
但由于我无法弄清楚的原因,nextflow 只创造了 3 个工作:[d8/45499b] process > DNA_HCG (0_wt5_BP-CM029350.1) [ 0%] 0 of 3
我想可能是因为它只取了第一个样本,然后对每个染色体进行了一个处理。尽管我对此表示怀疑,因为该代码正确地适用于不同的参考基因组。不管怎样,我调整了输入通道:
processed_bams
.combine(chromosomes_ch)
.set { HCG_in }
和
input:
set SampleID, path(bam), path(bai), chrom from HCG_in
但这导致只创建了一个工作:[6e/78b070] process > DNA_HCG (0_wt10_BP-CM029350.1) [ 0%] 0 of 1
令人困惑的是,当我使用 HCG_in.view()
时,有 30 个项目。更让我困惑的是,正确的工作数量来自以下代码:
chrs = params.Chroms.split(",")
chromosomes_ch = Channel
.from(chrs)
Channel
.fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.ifEmpty { error "No bams found in ${params.ProcBamDir}" }
.map { ID, files -> tuple(ID, files[0], files[1]) }
.set { processed_bams }
process HCG {
executor 'local'
input:
each chrom from chromosomes_ch
set SampleID, path(bam), path(bai) from processed_bams
//set SampleID, path(bam), path(bai), chrom from HCG_in
script:
"""
echo "${SampleID} - ${chrom}"
"""
}
输出:[75/c1c25a] process > HCG (27) [100%] 30 of 30 ✔
我希望我只是错过了一些明显的东西,但我现在看不到它。在此先感谢您的帮助。
像这样的问题几乎总是涉及使用multiple input channels:
When two or more channels are declared as process inputs, the process
stops until there’s a complete input configuration ie. it receives an
input value from all the channels declared as input.
您的初步评估是正确的。然而,只有三个过程是 运行 的原因(即三个染色体中的每一个的一个样本),是因为这一行(可能)returned 了一个列表(即一个 java LinkedList)包含一个 单个 元素,列表的行为类似于队列通道:
ref_index = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )
您可能期望此为 return UnixPath。最终,解决方案是确保 ref_index
是价值渠道。
我一直在努力确定为什么 nextflow (v20.10.00) 进程没有使用通道中的所有项目。我希望每个样本 bam 文件(总共 10 个)和每个染色体(总共 3 个)的过程 运行。
下面是频道的创建和过程:
ref_genome = file( params.RefGen, checkIfExists: true )
ref_dir = ref_genome.getParent()
ref_name = ref_genome.getBaseName()
ref_dict = file( "${ref_dir}/${ref_name}.dict", checkIfExists: true )
ref_index = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )
// Handles reading in data if the previous step is skipped
if( params.Skip_BP ){
Channel
.fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.ifEmpty { error "No bams found in ${params.ProcBamDir}" }
.map { ID, files -> tuple(ID, files[0], files[1]) }
.set { processed_bams }
}
// Setting up the chromosome channel
if( params.Chroms == "" ){
// Defaulting to using all chromosomes
chromosomes_ch = Channel
.from("AgamP4_2L", "AgamP4_2R", "AgamP4_3L", "AgamP4_3R", "AgamP4_X", "AgamP4_Y_unplaced", "AgamP4_UNKN")
println "No chromosomes specified, using all major chromosomes: AgamP4_2L, AgamP4_2R, AgamP4_3L, AgamP4_3R, AgamP4_X, AgamP4_Y_unplaced, AgamP4_UNKN"
} else {
// User option to choose which chromosome will be used
// This worked with the following syntax nextflow run testing.nf --profile imperial --Chroms "AgamP4_3R,AgamP4_2L"
chrs = params.Chroms.split(",")
chromosomes_ch = Channel
.from( chrs )
println "User defined chromosomes set: ${params.Chroms}"
}
process DNA_HCG {
errorStrategy { sleep(Math.pow(2, task.attempt) * 600 as long); return 'retry' }
maxRetries 3
maxForks params.HCG_Forks
tag { SampleID+"-"+chrom }
executor = 'pbspro'
clusterOptions = "-lselect=1:ncpus=${params.HCG_threads}:mem=${params.HCG_memory}gb:mpiprocs=1:ompthreads=${params.HCG_threads} -lwalltime=${params.HCG_walltime}:00:00"
publishDir(
path: "${params.HCDir}",
mode: 'copy',
)
input:
each chrom from chromosomes_ch
set SampleID, path(bam), path(bai) from processed_bams
path ref_genome
path ref_dict
path ref_index
output:
tuple chrom, path("${SampleID}-${chrom}.vcf") into HCG_ch
path("${SampleID}-${chrom}.vcf.idx") into idx_ch
beforeScript 'module load anaconda3/personal; source activate NF_GATK'
script:
"""
if [ ! -d tmp ]; then mkdir tmp; fi
taskset -c 0-${params.HCG_threads} gatk --java-options \"-Xmx${params.HCG_memory}G -XX:+UseParallelGC -XX:ParallelGCThreads=${params.HCG_threads}\" HaplotypeCaller \
--tmp-dir tmp/ \
--pair-hmm-implementation AVX_LOGLESS_CACHING_OMP \
--native-pair-hmm-threads ${params.HCG_threads} \
-ERC GVCF \
-L ${chrom} \
-R ${ref_genome} \
-I ${bam} \
-O ${SampleID}-${chrom}.vcf ${params.GVCF_args}
"""
}
但由于我无法弄清楚的原因,nextflow 只创造了 3 个工作:[d8/45499b] process > DNA_HCG (0_wt5_BP-CM029350.1) [ 0%] 0 of 3
我想可能是因为它只取了第一个样本,然后对每个染色体进行了一个处理。尽管我对此表示怀疑,因为该代码正确地适用于不同的参考基因组。不管怎样,我调整了输入通道:
processed_bams
.combine(chromosomes_ch)
.set { HCG_in }
和
input:
set SampleID, path(bam), path(bai), chrom from HCG_in
但这导致只创建了一个工作:[6e/78b070] process > DNA_HCG (0_wt10_BP-CM029350.1) [ 0%] 0 of 1
令人困惑的是,当我使用 HCG_in.view()
时,有 30 个项目。更让我困惑的是,正确的工作数量来自以下代码:
chrs = params.Chroms.split(",")
chromosomes_ch = Channel
.from(chrs)
Channel
.fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.ifEmpty { error "No bams found in ${params.ProcBamDir}" }
.map { ID, files -> tuple(ID, files[0], files[1]) }
.set { processed_bams }
process HCG {
executor 'local'
input:
each chrom from chromosomes_ch
set SampleID, path(bam), path(bai) from processed_bams
//set SampleID, path(bam), path(bai), chrom from HCG_in
script:
"""
echo "${SampleID} - ${chrom}"
"""
}
输出:[75/c1c25a] process > HCG (27) [100%] 30 of 30 ✔
我希望我只是错过了一些明显的东西,但我现在看不到它。在此先感谢您的帮助。
像这样的问题几乎总是涉及使用multiple input channels:
When two or more channels are declared as process inputs, the process stops until there’s a complete input configuration ie. it receives an input value from all the channels declared as input.
您的初步评估是正确的。然而,只有三个过程是 运行 的原因(即三个染色体中的每一个的一个样本),是因为这一行(可能)returned 了一个列表(即一个 java LinkedList)包含一个 单个 元素,列表的行为类似于队列通道:
ref_index = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )
您可能期望此为 return UnixPath。最终,解决方案是确保 ref_index
是价值渠道。