将以前的 bash 调度程序提交脚本合并到 NextFlow 工作流的最类似于 NextFlow (DSL2) 的方式

The most NextFlow-like (DSL2) way to incorporate a former bash scheduler submission script to a NextFlow workflow

这里是 NextFlow 的新手,并且在一些基本概念上苦苦挣扎。我正在将一组 bash 脚本从 a previous publication 转换为 NextFlow 工作流程。

我正在转换 a simple bash script(为方便起见包含在下面),它做了一些基本的准备工作,并为每次迭代向集群调度程序提交了一个新作业。

终极问题:将此脚本合并到 NextFlow 工作流程(最好使用新的 DSL2 模式)的最类似于 NextFlow 的方式是什么?

可能的子问题:是否可以发出基于bash变量的列表列表?我见过将列表从工作流 传递到 进程的方法,但不是 进程传递出去的方法。我可以将每组参数打印到一个文件中,然后发出该文件,但这看起来不太像 NextFlow。

对于如何将以下 bash 脚本合并到 NextFlow 工作流中的任何指导,我将不胜感激。我添加了注释并指明了我需要作为一组参数发出的四个变量。

谢谢!

# Input variables. I know how to take these in.
GVCF_DIR=
GATK_bed=
RESULT_DIR=
CAMO_MASK_REF_PREFIX=
GATK_JAR=

# For each directory
for dir in ${GVCF_DIR}/*
do
    # Do some some basic prep work defining
    # variables and setting up results directory
    ploidy=$(basename $dir)
    repeat=$((${ploidy##*_} / 2))
    result_dir="${RESULT_DIR}/genotyped_by_region/${ploidy}"  # Needs to be emitted
    mkdir -p $result_dir

    # Create a new file with a list of files. This file
    # will be used as input to the downstream NextFlow process
    gvcf_list="${ploidy}.gvcfs.list"                          # Needs to be emitted
    find $dir -name "*.g.vcf" > $gvcf_list 

    REF="${CAMO_MASK_REF_PREFIX}.${ploidy}.fa"                # Needs to be emitted

    # For each line in the $GATK_bed file where 
    # column 5 == repeat (defined above), submit
    # a new job to the scheduler with that region.
    awk "$5 == $repeat {print $1\":\"$2\"-\"$3}" $GATK_bed | \
        while read region                                     # Needs to be emitted
        do 
            qsub combine_and_genotype.ogs \
                    $gvcf_list \
                    $region \
                    $result_dir \
                    $REF \
                    $GATK_JAR
        done
done

What is the most NextFlow-like way to incorporate this script into a NextFlow workflow

在某些情况下,可以合并不需要“按原样”编译的 third-party scripts,方法是使它们可执行并将它们移动到根目录中名为 'bin' 的文件夹中您的项目存储库的目录。 Nextflow自动将此文件夹添加到执行环境的$PATH中。

但是,某些脚本不适合以这种方式收录。如果 objective 是为了生成可移植且可重现的工作流程,情况尤其如此,这就是我解释“最像 Nextflow 的方式”的方式。 objective 最终变成了 运行 每个过程步骤如何孤立。鉴于您的示例,以下是我对此的看法:

nextflow.enable.dsl=2

params.GVCF_DIRECTORY = './path/to/directories'
params.GATK_BED_FILE = './path/to/file.bed'
params.CAMO_MASK_REF_PREFIX = 'someprefix'

params.publish_dir = './results'
process combine_and_genotype {

    publishDir "${params.publish_dir}/${dirname}"

    container 'quay.io/biocontainers/gatk4:4.2.4.1--hdfd78af_0'

    cpus 1
    memory 40.GB

    input:
    tuple val(dirname), val(region_string), path(ref_fasta), path(gvcf_files)

    output:
    tuple val(dirname), val(region_string), path("full_cohort.combined.${region}.g.vcf")

    script:
    region = region_string.replaceAll(':', '_')

    def avail_mem = task.memory ? task.memory.toGiga() : 0

    def Xmx = avail_mem >= 8 ? "-Xmx${avail_mem - 1}G" : ''
    def Xms = avail_mem >= 8 ? "-Xms${avail_mem.intdiv(2)}G" : ''

    """
    cat << __EOF__ > "${dirname}.gvcf.list"
    ${gvcf_files.join('\n'+' '*4)}
    __EOF__

    gatk \
        --java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
        CombineGVCFs \
        -R "${ref_fasta}" \
        -L "${region_string}" \
        -O "full_cohort.combined.${region}.g.vcf" \
        -V "${dirname}.gvcf.list"

    gatk \
        --java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
        GenotypeGVCFs \
        -R "${ref_fasta}" \
        -L "${region_string}" \
        -O "full_cohort.combined.${region}.vcf" \
        -V "full_cohort.combined.${region}.g.vcf" \
        -A GenotypeSummaries
    """
}
workflow {

    GVCF_DIRECTORY = file( params.GVCF_DIRECTORY )
    GATK_BED_FILE = file( params.GATK_BED_FILE )

    Channel.fromPath( params.GATK_BED_FILE ) \
        | splitCsv(sep: '\t') \
        | map { row ->
            tuple( row[4].toInteger(), "${row[0]}:${row[1]}-${row[2]}" )
        } \
        | set { regions }

    Channel.fromPath( "${GVCF_DIRECTORY.toString()}/**/*.g.vcf" ) \
        | map { tuple( GVCF_DIRECTORY.relativize(it).subpath(0,1).name, it ) } \
        | groupTuple() \
        | map { dirname, gvcf_files ->
            def ploidy = dirname.replaceFirst(/^.*_/, "").toInteger()
            def repeat = ploidy.intdiv(2)

            def ref_fasta = file( "${params.CAMO_MASK_REF_PREFIX}.${dirname}.fa" )

            tuple( repeat, dirname, ref_fasta, gvcf_files )
        } \
        | combine( regions, by: 0 ) \
        | map { repeat, dirname, ref_fasta, gvcf_files, region ->
            tuple( dirname, region, ref_fasta, gvcf_files )
        } \
        | combine_and_genotype
}

从 GATK 文档中,我实际上看不到变体输入在哪里可以是文件列表。也许此功能只能使用较旧的 GATK。上面的代码未经测试。

另外,您需要确保您的代码使用四个空格缩进。如果使用制表符缩进,或者如果您要使用不同数量的空格进行缩进,以上代码将引发一些错误。