有没有办法在 nextflow 中从 CSV 中查找值?或者,或者,重复使用 CSV?

Is there a way to lookup a value from a CSV in nextflow? Or, alternately, reuse a CSV?

我创建了一个简单的 csv 作为工作流程的一部分,如下所示:

sample,value
A,1
B,0.5

另外,我还有另一个频道,其文件名与示例名称匹配。我希望能够在新流程中使用与每个样本名称关联的值。

我曾尝试使用 .splitCsv 拆分 CSV,但(不出所料)有时样本会使用不正确的值,尽管 运行 的次数是正确的。我也试过只在脚本中使用 awk 来提取相应的值并将其保存到一个变量中,这会导致使用正确的值,但它会消耗 CSV 文件,因此只处理了一个样本。

超级简化的 nextflow (DSL2) 脚本:

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

process foo {
   input:
   path input_file

   output:
   path 'file.csv', emit csv

   """
   script that creates csv
   """
}

process bar {
   input:
   path input_file2

   output:
   path 'file.bam', emit bam

   """
   script that creates bam files
   """
}

process help_me {
   input:
   path csv
   path bam

   output:
   path 'result'

   """
   script that uses value from csv on associated bam file
   """
}

workflow {

   foo(params.input)
   bar(params.input2)
   help_me(foo.out.csv, bar.out.bam)
}

谢谢!!

编辑:本质上,有没有办法同步两个通道,以便我可以将 csv 的各个行与关联文件一起使用?

如果您有 value channel,您可以无限次重复使用文件(如 CSV)而无需占用频道。例如:

workflow {

   input1 = file( params.input1 )
   input2 = file( params.input2 )

   foo( input1 )
   bar( input2 )

   help_me(foo.out.csv, bar.out.bam)
}

这里,input1input2都是价值通道。另外,(强调我的):

A value channel is implicitly created by a process when an input specifies a simple value in the from clause. Moreover, a value channel is also implicitly created as output for a process whose inputs are only value channels.

表示foo.out.csvbar.out.bam也是价值通道。此外,help_me.out也是一个价值通道。如果 input2 改为 queue channel,您可以看到 input1 可以 re-used 无限次:

$ mkdir -p ./path/to/bams
$ touch ./path/to/bams/{A,B,C}.bam
$ touch ./foo.txt
params.input1 = './foo.txt'
params.input2 = './path/to/bams/*.bam'

workflow {

   input1 = file( params.input1 )
   input2 = Channel.fromPath( params.input2 )

   foo( input1 )
   bar( input2 )

   help_me(foo.out.csv, bar.out.bam)
}

结果:

$ nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [trusting_allen] DSL2 - revision: 75209e4c85
executor >  local (7)
[24/d459f7] process > foo         [100%] 1 of 1 ✔
[04/a903e4] process > bar (2)     [100%] 3 of 3 ✔
[24/7a9a1d] process > help_me (3) [100%] 3 of 3 ✔

请注意 bar.out.bamhelp_me.out 现在是队列通道。

相反,如果每个样本(或类似配置)有一个 CSV,则您将需要某种方式来事先加入这些频道并相应地调整新流程的输入声明。您要避免的是在输入块中声明两个(或更多)队列通道。这部分文档非常值得投入时间:Understand how multiple input channels work, and would explain why you saw the incorrect value being associated with a particular sample when consuming the splitCsv output. To join these channels, you can use the join operator。例如,给定您的简单 CSV(如 'foo.csv')和之前创建的测试 bams:

nextflow.enable.dsl=2

params.input1 = './foo.csv'
params.input2 = './path/to/bams/*.bam'


process help_me {

   debug true

   input:
   tuple val(sample), val(myval), path(bam)

   output:
   path 'result'

   """
   echo -n "sample: ${sample}, myval: ${myval}, bam: ${bam}"
   touch result
   """
}

workflow {

    Channel.fromPath( params.input1 ) \
       | splitCsv( header:true ) \
       | map { row -> tuple( row.sample, row.value ) } \
       | set { rows_ch }

    Channel.fromPath( params.input2 ) \
       | map { bam -> tuple( bam.baseName, bam ) } \
       | join( rows_ch ) \
       | map { sample, bam, myval -> tuple( sample, myval, bam ) } \
       | help_me
}

结果:

$ nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [lethal_mayer] DSL2 - revision: 395732babc
executor >  local (2)
[c5/e96085] process > help_me (1) [100%] 2 of 2 ✔
sample: B, myval: 0.5, bam: B.bam
sample: A, myval: 1, bam: A.bam

如果您的 CSV 有多个特定样本的值,并且这些值是在单独的行中指定的,您可能需要 combine 运算符。例如,如果您的 'foo.csv' 包含:

sample,value
A,1
B,0.5
B,2

并将上例中的 join( rows_ch ) 替换为 combine( rows_ch, by:0 )。结果:

nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [festering_miescher] DSL2 - revision: f8de1e0d20
executor >  local (3)
[ee/8af543] process > help_me (3) [100%] 3 of 3 ✔
sample: A, myval: 1, bam: A.bam
sample: B, myval: 0.5, bam: B.bam
sample: B, myval: 2, bam: B.bam