根据组 ID 将文件传递到 Nextflow 进程
Passing files to a Nextflow process based on a group ID
我正在努力解决一个问题,这个问题可能有一个非常基本的解决方案。
在我的 (dsl2) nextflow 工作流程中,我有许多进程输出一个文件元组和一个值,该值指示输入元素的组。
然后,我有一个最后的过程,对于每个组,收集所有生成的文件并处理它们。
例如:
workflow {
generatePDF(input_channel)
generateCSV(input_channel)
generateTXT(input_channel)
mergeprocess( /*grouped input files from previous 3 processes */ )
}
而且,正如我提到的,每个生成*过程的输出是
tuple file(output_${samplename}.${extension}) val(${group})
例如,如果我有样本 1、2 和 3 属于 A 组,4 属于 B 组,5 和 6 属于 C 组,我想作为最后一个过程的输入传递
output_sample1.pdf output_sample2.pdf output_sample3.pdf output_sample1.csv output_sample2.csv output_sample3.csv output_sample1.txt output_sample2.txt output_sample3.txt
output_sample4.pdf output_sample4.csv output_sample4.txt
output_sample5.pdf output_sample6.pdf output_sample5.csv output_sample6.csv output_sample5.txt output_sample6.txt
我已经测试了 collect()、groupTuple 甚至 join() 的组合,但没有任何东西给我所需的通道。
谢谢你的时间。
一个解决方案,可能是您可能一直在努力实现的 'basic' 解决方案,是 mix your process outputs and then call the groupTuple 运算符,指定要用作分组键的元素的索引,使用 by
参数:
generatePDF.out
| mix( generateCSV.out, generateTXT.out )
| groupTuple( by: 1 )
| view()
但是,此解决方案将'wait' 完成所有生成过程(generatePDF、generateCSV、generateTXT),然后再继续合并 any 分组输出。这是因为 groupTuple 理想情况下需要知道分组列表应包含的项目数:
You should always specify the number of expected elements in each
tuple using the size
attribute to allow the groupTuple
operator to
stream the collected values as soon as possible.
如果您的生成过程都确定给定样本的同一组(它们可能会这样做),您应该能够以允许您预先确定每个组的大小的方式解耦此逻辑。然后您可以创建一个特殊的分组键(即使用 groupKey 函数)并使用它来对流程输出进行分组。以下示例只是从父目录中预先确定组,但希望适用于您的用例:
首先,让我们创建一些测试数据:
mkdir -p input_files/{A,B,C}
touch input_files/A/sample{1,2,3}.ext
touch input_files/B/sample4.ext
touch input_files/C/sample{5,6}.ext
然后,将以下内容放入名为 script.nf
的文件中:
nextflow.enable.dsl=2
process generatePDF {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.pdf")
"""
touch "output_${sample_name}.pdf"
"""
}
process generateCSV {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.csv")
"""
touch "output_${sample_name}.csv"
"""
}
process generateTXT {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.txt")
"""
touch "output_${sample_name}.txt"
"""
}
process merge_process {
tag { group }
echo true
input:
tuple val(group), path(input_files)
"""
echo "group: ${group}"
ls ${input_files}
"""
}
workflow {
Channel.fromPath( './input_files/*/*.ext' )
| map { infile -> tuple( infile.parent.name, infile ) }
| groupTuple()
| map { group, files -> tuple( groupKey(group, files.size()), files) }
| transpose()
| set { input_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, infile ) }
| ( generatePDF & generateCSV & generateTXT )
| mix
| groupTuple( size: 3 )
| set { outputs_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, key ) }
| join( outputs_ch )
| map { sample, key, files -> tuple( key, files ) }
| groupTuple()
| map { key, grp_files -> tuple( key.toString(), grp_files.flatten() ) }
| merge_process
}
结果:
$ nextflow run script.nf
N E X T F L O W ~ version 20.10.0
Launching `script.nf` [irreverent_lavoisier] - revision: aba248d32e
executor > local (21)
[5d/5cd70d] process > generatePDF (1) [100%] 6 of 6 ✔
[4c/c58256] process > generateCSV (2) [100%] 6 of 6 ✔
[d2/93402c] process > generateTXT (5) [100%] 6 of 6 ✔
[9b/04ea07] process > merge_process (C) [100%] 3 of 3 ✔
group: B
output_sample4.csv
output_sample4.pdf
output_sample4.txt
group: A
output_sample1.csv
output_sample1.pdf
output_sample1.txt
output_sample2.csv
output_sample2.pdf
output_sample2.txt
output_sample3.csv
output_sample3.pdf
output_sample3.txt
group: C
output_sample5.csv
output_sample5.pdf
output_sample5.txt
output_sample6.csv
output_sample6.pdf
output_sample6.txt
我正在努力解决一个问题,这个问题可能有一个非常基本的解决方案。 在我的 (dsl2) nextflow 工作流程中,我有许多进程输出一个文件元组和一个值,该值指示输入元素的组。 然后,我有一个最后的过程,对于每个组,收集所有生成的文件并处理它们。 例如:
workflow {
generatePDF(input_channel)
generateCSV(input_channel)
generateTXT(input_channel)
mergeprocess( /*grouped input files from previous 3 processes */ )
}
而且,正如我提到的,每个生成*过程的输出是
tuple file(output_${samplename}.${extension}) val(${group})
例如,如果我有样本 1、2 和 3 属于 A 组,4 属于 B 组,5 和 6 属于 C 组,我想作为最后一个过程的输入传递
output_sample1.pdf output_sample2.pdf output_sample3.pdf output_sample1.csv output_sample2.csv output_sample3.csv output_sample1.txt output_sample2.txt output_sample3.txt
output_sample4.pdf output_sample4.csv output_sample4.txt
output_sample5.pdf output_sample6.pdf output_sample5.csv output_sample6.csv output_sample5.txt output_sample6.txt
我已经测试了 collect()、groupTuple 甚至 join() 的组合,但没有任何东西给我所需的通道。
谢谢你的时间。
一个解决方案,可能是您可能一直在努力实现的 'basic' 解决方案,是 mix your process outputs and then call the groupTuple 运算符,指定要用作分组键的元素的索引,使用 by
参数:
generatePDF.out
| mix( generateCSV.out, generateTXT.out )
| groupTuple( by: 1 )
| view()
但是,此解决方案将'wait' 完成所有生成过程(generatePDF、generateCSV、generateTXT),然后再继续合并 any 分组输出。这是因为 groupTuple 理想情况下需要知道分组列表应包含的项目数:
You should always specify the number of expected elements in each tuple using the
size
attribute to allow thegroupTuple
operator to stream the collected values as soon as possible.
如果您的生成过程都确定给定样本的同一组(它们可能会这样做),您应该能够以允许您预先确定每个组的大小的方式解耦此逻辑。然后您可以创建一个特殊的分组键(即使用 groupKey 函数)并使用它来对流程输出进行分组。以下示例只是从父目录中预先确定组,但希望适用于您的用例:
首先,让我们创建一些测试数据:
mkdir -p input_files/{A,B,C}
touch input_files/A/sample{1,2,3}.ext
touch input_files/B/sample4.ext
touch input_files/C/sample{5,6}.ext
然后,将以下内容放入名为 script.nf
的文件中:
nextflow.enable.dsl=2
process generatePDF {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.pdf")
"""
touch "output_${sample_name}.pdf"
"""
}
process generateCSV {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.csv")
"""
touch "output_${sample_name}.csv"
"""
}
process generateTXT {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.txt")
"""
touch "output_${sample_name}.txt"
"""
}
process merge_process {
tag { group }
echo true
input:
tuple val(group), path(input_files)
"""
echo "group: ${group}"
ls ${input_files}
"""
}
workflow {
Channel.fromPath( './input_files/*/*.ext' )
| map { infile -> tuple( infile.parent.name, infile ) }
| groupTuple()
| map { group, files -> tuple( groupKey(group, files.size()), files) }
| transpose()
| set { input_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, infile ) }
| ( generatePDF & generateCSV & generateTXT )
| mix
| groupTuple( size: 3 )
| set { outputs_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, key ) }
| join( outputs_ch )
| map { sample, key, files -> tuple( key, files ) }
| groupTuple()
| map { key, grp_files -> tuple( key.toString(), grp_files.flatten() ) }
| merge_process
}
结果:
$ nextflow run script.nf
N E X T F L O W ~ version 20.10.0
Launching `script.nf` [irreverent_lavoisier] - revision: aba248d32e
executor > local (21)
[5d/5cd70d] process > generatePDF (1) [100%] 6 of 6 ✔
[4c/c58256] process > generateCSV (2) [100%] 6 of 6 ✔
[d2/93402c] process > generateTXT (5) [100%] 6 of 6 ✔
[9b/04ea07] process > merge_process (C) [100%] 3 of 3 ✔
group: B
output_sample4.csv
output_sample4.pdf
output_sample4.txt
group: A
output_sample1.csv
output_sample1.pdf
output_sample1.txt
output_sample2.csv
output_sample2.pdf
output_sample2.txt
output_sample3.csv
output_sample3.pdf
output_sample3.txt
group: C
output_sample5.csv
output_sample5.pdf
output_sample5.txt
output_sample6.csv
output_sample6.pdf
output_sample6.txt