基准通道创建 NextFlow
Benchmark channel creation NextFlow
我正在 NextFlow 上执行分散-聚集操作。
看起来像下面这样:
reads = PATH+"test_1.fq"
outdir = "results"
split_read_ch = channel.fromFilePairs(reads, checkIfExists: true, flat:true ).splitFastq( by: 10, file:"test_split" )
process Scatter_fastP {
tag 'Scatter_fastP'
publishDir outdir
input:
tuple val(name), path(reads) from split_read_ch
output:
file "${reads}.trimmed.fastq" into gather_fatsp_ch
script:
"""
fastp -i ${reads} -o ${reads}.trimmed.fastq
"""
}
gather_fatsp_ch.collectFile().view().println{ it.text }
我 运行 此代码包含 Nextflow (https://www.nextflow.io/docs/latest/tracing.html) 提出的所有基准选项:
nextflow run main.nf -with-report nextflow_report -with-trace nextflow_trace -with-timeline nextflow_timeline -with-dag nextflow_dag.html
在这些跟踪文件中,我可以找到 10 个 Scatter_fastP 进程的资源和速度。
但我还想衡量创建 split_read_ch
和 gather_fastp_ch
频道的资源和速度。
我曾尝试将频道的创建包含在流程中,但我找不到使其正常工作的解决方案。
有没有办法将通道创建包含到跟踪文件中?还是我没有找到将这些通道创建到流程中的方法?
预先感谢您的帮助。
虽然 Nextflow 可以解析 FASTQ 文件并将它们拆分成更小的文件等,但通常最好将这些操作传递给另一个进程或一组进程,尤其是当您的输入 FASTQ 文件很大时。这有两个好处:(1) 您的主要 nextflow 流程不需要那么努力地工作,以及 (2) 您可以在 nextflow 报告中获得详细的任务流程统计信息。
以下示例使用 GNU split 拆分输入的 FASTQ 文件,并使用 groupTuple() 运算符和 groupKey()
内置函数收集输出以尽快流式传输收集的值。您需要适应您的非 gzip 压缩输入:
nextflow.enable.dsl=2
params.num_lines = 40000
params.suffix_length = 5
process split_fastq {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${name}-${/[0-9]/*params.suffix_length}.fastq.gz")
shell:
'''
zcat "!{fastq}" | split \
-a "!{params.suffix_length}" \
-d \
-l "!{params.num_lines}" \
--filter='gzip > ${FILE}.fastq.gz' \
- \
"!{name}-"
'''
}
process fastp {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${fastq.getBaseName(2)}.trimmed.fastq.gz")
"""
fastp -i "${fastq}" -o "${fastq.getBaseName(2)}.trimmed.fastq.gz"
"""
}
workflow {
Channel.fromFilePairs( './data/*.fastq.gz', size: 1 ) \
| split_fastq \
| map { name, fastq -> tuple( groupKey(name, fastq.size()), fastq ) } \
| transpose() \
| fastp \
| groupTuple() \
| map { key, fastqs -> tuple( key.toString(), fastqs ) } \
| view()
}
我正在 NextFlow 上执行分散-聚集操作。
看起来像下面这样:
reads = PATH+"test_1.fq"
outdir = "results"
split_read_ch = channel.fromFilePairs(reads, checkIfExists: true, flat:true ).splitFastq( by: 10, file:"test_split" )
process Scatter_fastP {
tag 'Scatter_fastP'
publishDir outdir
input:
tuple val(name), path(reads) from split_read_ch
output:
file "${reads}.trimmed.fastq" into gather_fatsp_ch
script:
"""
fastp -i ${reads} -o ${reads}.trimmed.fastq
"""
}
gather_fatsp_ch.collectFile().view().println{ it.text }
我 运行 此代码包含 Nextflow (https://www.nextflow.io/docs/latest/tracing.html) 提出的所有基准选项:
nextflow run main.nf -with-report nextflow_report -with-trace nextflow_trace -with-timeline nextflow_timeline -with-dag nextflow_dag.html
在这些跟踪文件中,我可以找到 10 个 Scatter_fastP 进程的资源和速度。
但我还想衡量创建 split_read_ch
和 gather_fastp_ch
频道的资源和速度。
我曾尝试将频道的创建包含在流程中,但我找不到使其正常工作的解决方案。 有没有办法将通道创建包含到跟踪文件中?还是我没有找到将这些通道创建到流程中的方法?
预先感谢您的帮助。
虽然 Nextflow 可以解析 FASTQ 文件并将它们拆分成更小的文件等,但通常最好将这些操作传递给另一个进程或一组进程,尤其是当您的输入 FASTQ 文件很大时。这有两个好处:(1) 您的主要 nextflow 流程不需要那么努力地工作,以及 (2) 您可以在 nextflow 报告中获得详细的任务流程统计信息。
以下示例使用 GNU split 拆分输入的 FASTQ 文件,并使用 groupTuple() 运算符和 groupKey()
内置函数收集输出以尽快流式传输收集的值。您需要适应您的非 gzip 压缩输入:
nextflow.enable.dsl=2
params.num_lines = 40000
params.suffix_length = 5
process split_fastq {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${name}-${/[0-9]/*params.suffix_length}.fastq.gz")
shell:
'''
zcat "!{fastq}" | split \
-a "!{params.suffix_length}" \
-d \
-l "!{params.num_lines}" \
--filter='gzip > ${FILE}.fastq.gz' \
- \
"!{name}-"
'''
}
process fastp {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${fastq.getBaseName(2)}.trimmed.fastq.gz")
"""
fastp -i "${fastq}" -o "${fastq.getBaseName(2)}.trimmed.fastq.gz"
"""
}
workflow {
Channel.fromFilePairs( './data/*.fastq.gz', size: 1 ) \
| split_fastq \
| map { name, fastq -> tuple( groupKey(name, fastq.size()), fastq ) } \
| transpose() \
| fastp \
| groupTuple() \
| map { key, fastqs -> tuple( key.toString(), fastqs ) } \
| view()
}