多个输出到单个列表输入 - 在 Nextflow 中合并 BAM 文件
Multiple outputs to single list input - merging BAM files in Nextflow
我正在尝试合并 x 个通过一次执行多个比对生成的 bam 文件(在 y 个 fastq 的批次上文件)到 Nextflow 中的单个 bam 文件中。
到目前为止,在执行对齐和 sorting/indexing 生成的 bam 文件时,我有以下内容:
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
val dirString from dirStr
val runString from stringRun
each file(batchFastq) from fastqBatch.flatMap()
output:
val runString into stringRun1
file("${batchFastq}.bam") into bamFiles
val dirString into dirStrSam
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
其中${batchFastq}.bam
是一个包含一批y个fastq文件的bam文件。
此管道完成得很好,但是,当尝试在另一个进程 (samToolsMerge) 中对这些 bam 文件执行 samtools merge
时,进程 运行s 每次对齐都是 运行(在本例中为 4),而不是收集所有 bam 文件一次:
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
val runString from stringRun1
file bamFile from bamFiles.collect()
val dirString from dirStrSam
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""
}
输出为:
executor > lsf (9)
[49/182ec0] process > catFastqs (1) [100%] 1 of 1 ✔
[- ] process > nanoPlotSummary -
[0e/609a7a] process > miniMap2Bam (1) [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔
Completed at: 04-Mar-2021 14:54:21
Duration : 5m 41s
CPU hours : 0.2
Succeeded : 9
如何只从 miniMap2Bam
和 运行 通过 samToolsMerge
一次获取生成的 bam 文件,而不是多次处理 运行ning?
提前致谢!
编辑:
感谢 Pallie 在下面的评论中,问题是将先前过程中的 运行String 和 dirString 值输入到 miniMap2Bam,然后是 samToolsMerge,导致每次传递值时该过程都会重复。
解决方案就像从 miniMap2Bam 中删除 vals 一样简单(如下所示):
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
最简单的修复可能是停止通过通道传递静态 dirstring 和 runstring:
// Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
file bamFile from bamFiles.collect()
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""
我正在尝试合并 x 个通过一次执行多个比对生成的 bam 文件(在 y 个 fastq 的批次上文件)到 Nextflow 中的单个 bam 文件中。
到目前为止,在执行对齐和 sorting/indexing 生成的 bam 文件时,我有以下内容:
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
val dirString from dirStr
val runString from stringRun
each file(batchFastq) from fastqBatch.flatMap()
output:
val runString into stringRun1
file("${batchFastq}.bam") into bamFiles
val dirString into dirStrSam
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
其中${batchFastq}.bam
是一个包含一批y个fastq文件的bam文件。
此管道完成得很好,但是,当尝试在另一个进程 (samToolsMerge) 中对这些 bam 文件执行 samtools merge
时,进程 运行s 每次对齐都是 运行(在本例中为 4),而不是收集所有 bam 文件一次:
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
val runString from stringRun1
file bamFile from bamFiles.collect()
val dirString from dirStrSam
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""
}
输出为:
executor > lsf (9)
[49/182ec0] process > catFastqs (1) [100%] 1 of 1 ✔
[- ] process > nanoPlotSummary -
[0e/609a7a] process > miniMap2Bam (1) [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔
Completed at: 04-Mar-2021 14:54:21
Duration : 5m 41s
CPU hours : 0.2
Succeeded : 9
如何只从 miniMap2Bam
和 运行 通过 samToolsMerge
一次获取生成的 bam 文件,而不是多次处理 运行ning?
提前致谢!
编辑: 感谢 Pallie 在下面的评论中,问题是将先前过程中的 运行String 和 dirString 值输入到 miniMap2Bam,然后是 samToolsMerge,导致每次传递值时该过程都会重复。
解决方案就像从 miniMap2Bam 中删除 vals 一样简单(如下所示):
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
最简单的修复可能是停止通过通道传递静态 dirstring 和 runstring:
// Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
file bamFile from bamFiles.collect()
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""