Nextflow:并非流程使用的通道中的所有项目

Nextflow: Not all items in channel used by process

我一直在努力确定为什么 nextflow (v20.10.00) 进程没有使用通道中的所有项目。我希望每个样本 bam 文件(总共 10 个)和每个染色体(总共 3 个)的过程 运行。

下面是频道的创建和过程:

ref_genome = file( params.RefGen, checkIfExists: true )
ref_dir    = ref_genome.getParent()
ref_name   = ref_genome.getBaseName()
ref_dict   = file( "${ref_dir}/${ref_name}.dict", checkIfExists: true )
ref_index  = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )

// Handles reading in data if the previous step is skipped
if( params.Skip_BP ){
  Channel
    .fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
    .ifEmpty { error "No bams found in ${params.ProcBamDir}" }
    .map { ID, files -> tuple(ID, files[0], files[1]) }
    .set { processed_bams }
}
// Setting up the chromosome channel
if( params.Chroms == "" ){
  // Defaulting to using all chromosomes
  chromosomes_ch = Channel
                      .from("AgamP4_2L", "AgamP4_2R", "AgamP4_3L", "AgamP4_3R", "AgamP4_X", "AgamP4_Y_unplaced", "AgamP4_UNKN")
  println "No chromosomes specified, using all major chromosomes: AgamP4_2L, AgamP4_2R, AgamP4_3L, AgamP4_3R, AgamP4_X, AgamP4_Y_unplaced, AgamP4_UNKN"
} else {
  // User option to choose which chromosome will be used
  // This worked with the following syntax nextflow run testing.nf --profile imperial --Chroms "AgamP4_3R,AgamP4_2L"
  chrs = params.Chroms.split(",")
  chromosomes_ch = Channel
                    .from( chrs )
  println "User defined chromosomes set: ${params.Chroms}"
}


process DNA_HCG {
  errorStrategy { sleep(Math.pow(2, task.attempt) * 600 as long); return 'retry' }
  maxRetries 3
  maxForks params.HCG_Forks

  tag { SampleID+"-"+chrom }

  executor = 'pbspro'
  clusterOptions = "-lselect=1:ncpus=${params.HCG_threads}:mem=${params.HCG_memory}gb:mpiprocs=1:ompthreads=${params.HCG_threads} -lwalltime=${params.HCG_walltime}:00:00"

  publishDir(
    path: "${params.HCDir}",
    mode: 'copy',
  )

  input:
  each chrom from chromosomes_ch
  set SampleID, path(bam), path(bai) from processed_bams
  path ref_genome
  path ref_dict
  path ref_index

  output:
  tuple chrom, path("${SampleID}-${chrom}.vcf") into HCG_ch
  path("${SampleID}-${chrom}.vcf.idx") into idx_ch
  
  beforeScript 'module load anaconda3/personal; source activate NF_GATK'

  script:
  """
  if [ ! -d tmp ]; then mkdir tmp; fi
  taskset -c 0-${params.HCG_threads} gatk --java-options \"-Xmx${params.HCG_memory}G -XX:+UseParallelGC -XX:ParallelGCThreads=${params.HCG_threads}\" HaplotypeCaller \
    --tmp-dir tmp/ \
    --pair-hmm-implementation AVX_LOGLESS_CACHING_OMP \
    --native-pair-hmm-threads ${params.HCG_threads} \
    -ERC GVCF \
    -L ${chrom} \
    -R ${ref_genome} \
    -I ${bam} \
    -O ${SampleID}-${chrom}.vcf ${params.GVCF_args}
  """
}

但由于我无法弄清楚的原因,nextflow 只创造了 3 个工作:[d8/45499b] process > DNA_HCG (0_wt5_BP-CM029350.1) [ 0%] 0 of 3

我想可能是因为它只取了第一个样本,然后对每个染色体进行了一个处理。尽管我对此表示怀疑,因为该代码正确地适用于不同的参考基因组。不管怎样,我调整了输入通道:

processed_bams
  .combine(chromosomes_ch)
  .set { HCG_in }

input:
set SampleID, path(bam), path(bai), chrom from HCG_in

但这导致只创建了一个工作:[6e/78b070] process > DNA_HCG (0_wt10_BP-CM029350.1) [ 0%] 0 of 1

令人困惑的是,当我使用 HCG_in.view() 时,有 30 个项目。更让我困惑的是,正确的工作数量来自以下代码:

chrs = params.Chroms.split(",")
      chromosomes_ch = Channel
                         .from(chrs)

Channel
  .fromFilePairs("${params.ProcBamDir}/*{bam,bai}") { file -> file.name.replaceAll(/.bam|.bai$/,'') }
  .ifEmpty { error "No bams found in ${params.ProcBamDir}" }
  .map { ID, files -> tuple(ID, files[0], files[1]) }
  .set { processed_bams }

process HCG {
  executor 'local'

  input:
  each chrom from chromosomes_ch
  set SampleID, path(bam), path(bai) from processed_bams
  //set SampleID, path(bam), path(bai), chrom from HCG_in

  script:
  """
  echo "${SampleID} - ${chrom}"
  """
}

输出:[75/c1c25a] process > HCG (27) [100%] 30 of 30 ✔

我希望我只是错过了一些明显的东西,但我现在看不到它。在此先感谢您的帮助。

像这样的问题几乎总是涉及使用multiple input channels:

When two or more channels are declared as process inputs, the process stops until there’s a complete input configuration ie. it receives an input value from all the channels declared as input.

您的初步评估是正确的。然而,只有三个过程是 运行 的原因(即三个染色体中的每一个的一个样本),是因为这一行(可能)returned 了一个列表(即一个 java LinkedList)包含一个 单个 元素,列表的行为类似于队列通道:

ref_index  = file( "${ref_dir}/${ref_name}.*.fai", checkIfExists: true )

您可能期望此为 return UnixPath。最终,解决方案是确保 ref_index 是价值渠道。