将以前的 bash 调度程序提交脚本合并到 NextFlow 工作流的最类似于 NextFlow (DSL2) 的方式
The most NextFlow-like (DSL2) way to incorporate a former bash scheduler submission script to a NextFlow workflow
这里是 NextFlow
的新手,并且在一些基本概念上苦苦挣扎。我正在将一组 bash
脚本从 a previous publication 转换为 NextFlow
工作流程。
我正在转换 a simple bash script(为方便起见包含在下面),它做了一些基本的准备工作,并为每次迭代向集群调度程序提交了一个新作业。
终极问题:将此脚本合并到 NextFlow 工作流程(最好使用新的 DSL2 模式)的最类似于 NextFlow 的方式是什么?
可能的子问题:是否可以发出基于bash
变量的列表列表?我见过将列表从工作流 传递到 进程的方法,但不是 从 进程传递出去的方法。我可以将每组参数打印到一个文件中,然后发出该文件,但这看起来不太像 NextFlow。
对于如何将以下 bash
脚本合并到 NextFlow 工作流中的任何指导,我将不胜感激。我添加了注释并指明了我需要作为一组参数发出的四个变量。
谢谢!
# Input variables. I know how to take these in.
GVCF_DIR=
GATK_bed=
RESULT_DIR=
CAMO_MASK_REF_PREFIX=
GATK_JAR=
# For each directory
for dir in ${GVCF_DIR}/*
do
# Do some some basic prep work defining
# variables and setting up results directory
ploidy=$(basename $dir)
repeat=$((${ploidy##*_} / 2))
result_dir="${RESULT_DIR}/genotyped_by_region/${ploidy}" # Needs to be emitted
mkdir -p $result_dir
# Create a new file with a list of files. This file
# will be used as input to the downstream NextFlow process
gvcf_list="${ploidy}.gvcfs.list" # Needs to be emitted
find $dir -name "*.g.vcf" > $gvcf_list
REF="${CAMO_MASK_REF_PREFIX}.${ploidy}.fa" # Needs to be emitted
# For each line in the $GATK_bed file where
# column 5 == repeat (defined above), submit
# a new job to the scheduler with that region.
awk "$5 == $repeat {print $1\":\"$2\"-\"$3}" $GATK_bed | \
while read region # Needs to be emitted
do
qsub combine_and_genotype.ogs \
$gvcf_list \
$region \
$result_dir \
$REF \
$GATK_JAR
done
done
What is the most NextFlow-like way to incorporate this script into a
NextFlow workflow
在某些情况下,可以合并不需要“按原样”编译的 third-party scripts,方法是使它们可执行并将它们移动到根目录中名为 'bin' 的文件夹中您的项目存储库的目录。 Nextflow自动将此文件夹添加到执行环境的$PATH中。
但是,某些脚本不适合以这种方式收录。如果 objective 是为了生成可移植且可重现的工作流程,情况尤其如此,这就是我解释“最像 Nextflow 的方式”的方式。 objective 最终变成了 运行 每个过程步骤如何孤立。鉴于您的示例,以下是我对此的看法:
nextflow.enable.dsl=2
params.GVCF_DIRECTORY = './path/to/directories'
params.GATK_BED_FILE = './path/to/file.bed'
params.CAMO_MASK_REF_PREFIX = 'someprefix'
params.publish_dir = './results'
process combine_and_genotype {
publishDir "${params.publish_dir}/${dirname}"
container 'quay.io/biocontainers/gatk4:4.2.4.1--hdfd78af_0'
cpus 1
memory 40.GB
input:
tuple val(dirname), val(region_string), path(ref_fasta), path(gvcf_files)
output:
tuple val(dirname), val(region_string), path("full_cohort.combined.${region}.g.vcf")
script:
region = region_string.replaceAll(':', '_')
def avail_mem = task.memory ? task.memory.toGiga() : 0
def Xmx = avail_mem >= 8 ? "-Xmx${avail_mem - 1}G" : ''
def Xms = avail_mem >= 8 ? "-Xms${avail_mem.intdiv(2)}G" : ''
"""
cat << __EOF__ > "${dirname}.gvcf.list"
${gvcf_files.join('\n'+' '*4)}
__EOF__
gatk \
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
CombineGVCFs \
-R "${ref_fasta}" \
-L "${region_string}" \
-O "full_cohort.combined.${region}.g.vcf" \
-V "${dirname}.gvcf.list"
gatk \
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
GenotypeGVCFs \
-R "${ref_fasta}" \
-L "${region_string}" \
-O "full_cohort.combined.${region}.vcf" \
-V "full_cohort.combined.${region}.g.vcf" \
-A GenotypeSummaries
"""
}
workflow {
GVCF_DIRECTORY = file( params.GVCF_DIRECTORY )
GATK_BED_FILE = file( params.GATK_BED_FILE )
Channel.fromPath( params.GATK_BED_FILE ) \
| splitCsv(sep: '\t') \
| map { row ->
tuple( row[4].toInteger(), "${row[0]}:${row[1]}-${row[2]}" )
} \
| set { regions }
Channel.fromPath( "${GVCF_DIRECTORY.toString()}/**/*.g.vcf" ) \
| map { tuple( GVCF_DIRECTORY.relativize(it).subpath(0,1).name, it ) } \
| groupTuple() \
| map { dirname, gvcf_files ->
def ploidy = dirname.replaceFirst(/^.*_/, "").toInteger()
def repeat = ploidy.intdiv(2)
def ref_fasta = file( "${params.CAMO_MASK_REF_PREFIX}.${dirname}.fa" )
tuple( repeat, dirname, ref_fasta, gvcf_files )
} \
| combine( regions, by: 0 ) \
| map { repeat, dirname, ref_fasta, gvcf_files, region ->
tuple( dirname, region, ref_fasta, gvcf_files )
} \
| combine_and_genotype
}
从 GATK 文档中,我实际上看不到变体输入在哪里可以是文件列表。也许此功能只能使用较旧的 GATK。上面的代码未经测试。
另外,您需要确保您的代码使用四个空格缩进。如果使用制表符缩进,或者如果您要使用不同数量的空格进行缩进,以上代码将引发一些错误。
这里是 NextFlow
的新手,并且在一些基本概念上苦苦挣扎。我正在将一组 bash
脚本从 a previous publication 转换为 NextFlow
工作流程。
我正在转换 a simple bash script(为方便起见包含在下面),它做了一些基本的准备工作,并为每次迭代向集群调度程序提交了一个新作业。
终极问题:将此脚本合并到 NextFlow 工作流程(最好使用新的 DSL2 模式)的最类似于 NextFlow 的方式是什么?
可能的子问题:是否可以发出基于bash
变量的列表列表?我见过将列表从工作流 传递到 进程的方法,但不是 从 进程传递出去的方法。我可以将每组参数打印到一个文件中,然后发出该文件,但这看起来不太像 NextFlow。
对于如何将以下 bash
脚本合并到 NextFlow 工作流中的任何指导,我将不胜感激。我添加了注释并指明了我需要作为一组参数发出的四个变量。
谢谢!
# Input variables. I know how to take these in.
GVCF_DIR=
GATK_bed=
RESULT_DIR=
CAMO_MASK_REF_PREFIX=
GATK_JAR=
# For each directory
for dir in ${GVCF_DIR}/*
do
# Do some some basic prep work defining
# variables and setting up results directory
ploidy=$(basename $dir)
repeat=$((${ploidy##*_} / 2))
result_dir="${RESULT_DIR}/genotyped_by_region/${ploidy}" # Needs to be emitted
mkdir -p $result_dir
# Create a new file with a list of files. This file
# will be used as input to the downstream NextFlow process
gvcf_list="${ploidy}.gvcfs.list" # Needs to be emitted
find $dir -name "*.g.vcf" > $gvcf_list
REF="${CAMO_MASK_REF_PREFIX}.${ploidy}.fa" # Needs to be emitted
# For each line in the $GATK_bed file where
# column 5 == repeat (defined above), submit
# a new job to the scheduler with that region.
awk "$5 == $repeat {print $1\":\"$2\"-\"$3}" $GATK_bed | \
while read region # Needs to be emitted
do
qsub combine_and_genotype.ogs \
$gvcf_list \
$region \
$result_dir \
$REF \
$GATK_JAR
done
done
What is the most NextFlow-like way to incorporate this script into a NextFlow workflow
在某些情况下,可以合并不需要“按原样”编译的 third-party scripts,方法是使它们可执行并将它们移动到根目录中名为 'bin' 的文件夹中您的项目存储库的目录。 Nextflow自动将此文件夹添加到执行环境的$PATH中。
但是,某些脚本不适合以这种方式收录。如果 objective 是为了生成可移植且可重现的工作流程,情况尤其如此,这就是我解释“最像 Nextflow 的方式”的方式。 objective 最终变成了 运行 每个过程步骤如何孤立。鉴于您的示例,以下是我对此的看法:
nextflow.enable.dsl=2
params.GVCF_DIRECTORY = './path/to/directories'
params.GATK_BED_FILE = './path/to/file.bed'
params.CAMO_MASK_REF_PREFIX = 'someprefix'
params.publish_dir = './results'
process combine_and_genotype {
publishDir "${params.publish_dir}/${dirname}"
container 'quay.io/biocontainers/gatk4:4.2.4.1--hdfd78af_0'
cpus 1
memory 40.GB
input:
tuple val(dirname), val(region_string), path(ref_fasta), path(gvcf_files)
output:
tuple val(dirname), val(region_string), path("full_cohort.combined.${region}.g.vcf")
script:
region = region_string.replaceAll(':', '_')
def avail_mem = task.memory ? task.memory.toGiga() : 0
def Xmx = avail_mem >= 8 ? "-Xmx${avail_mem - 1}G" : ''
def Xms = avail_mem >= 8 ? "-Xms${avail_mem.intdiv(2)}G" : ''
"""
cat << __EOF__ > "${dirname}.gvcf.list"
${gvcf_files.join('\n'+' '*4)}
__EOF__
gatk \
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
CombineGVCFs \
-R "${ref_fasta}" \
-L "${region_string}" \
-O "full_cohort.combined.${region}.g.vcf" \
-V "${dirname}.gvcf.list"
gatk \
--java-options "${Xmx} ${Xms} -XX:+UseSerialGC" \
GenotypeGVCFs \
-R "${ref_fasta}" \
-L "${region_string}" \
-O "full_cohort.combined.${region}.vcf" \
-V "full_cohort.combined.${region}.g.vcf" \
-A GenotypeSummaries
"""
}
workflow {
GVCF_DIRECTORY = file( params.GVCF_DIRECTORY )
GATK_BED_FILE = file( params.GATK_BED_FILE )
Channel.fromPath( params.GATK_BED_FILE ) \
| splitCsv(sep: '\t') \
| map { row ->
tuple( row[4].toInteger(), "${row[0]}:${row[1]}-${row[2]}" )
} \
| set { regions }
Channel.fromPath( "${GVCF_DIRECTORY.toString()}/**/*.g.vcf" ) \
| map { tuple( GVCF_DIRECTORY.relativize(it).subpath(0,1).name, it ) } \
| groupTuple() \
| map { dirname, gvcf_files ->
def ploidy = dirname.replaceFirst(/^.*_/, "").toInteger()
def repeat = ploidy.intdiv(2)
def ref_fasta = file( "${params.CAMO_MASK_REF_PREFIX}.${dirname}.fa" )
tuple( repeat, dirname, ref_fasta, gvcf_files )
} \
| combine( regions, by: 0 ) \
| map { repeat, dirname, ref_fasta, gvcf_files, region ->
tuple( dirname, region, ref_fasta, gvcf_files )
} \
| combine_and_genotype
}
从 GATK 文档中,我实际上看不到变体输入在哪里可以是文件列表。也许此功能只能使用较旧的 GATK。上面的代码未经测试。
另外,您需要确保您的代码使用四个空格缩进。如果使用制表符缩进,或者如果您要使用不同数量的空格进行缩进,以上代码将引发一些错误。