如何使用 `fromSRA` 函数下载 `Nextflow` 中的 `FastQ` 文件列表?

How to download a list of `FastQ` files in `Nextflow` using `fromSRA` function?

我有一个包含多个列的 tsv 文件。 run_accession 列是我感兴趣的列之一。它包含各种基因组数据样本的登录 ID。我想在 Nextflow 中编写一个管道,它使用以下命令从该文件中读取登录 ID:

cut -f4 datalist.tsv | sed -n 2,11p

输出:

ERR2512385  
ERR2512386  
ERR2512387  
ERR2512388  
ERR2512389  
ERR2512390  
ERR2512391  
ERR2512392  
ERR2512393  
ERR2512394

并将此 ID 列表提供给 Channel.fromSRA 方法。到目前为止,我已经试过了:

#!/home/someuser/bin nextflow

nextflow.enable.dsl=2

params.datalist = "$baseDir/datalist.tsv"

process fetchRunAccession {
    input:
    path dlist

    output:
    file accessions

    """
    cut -f4 $dlist | sed -n 2,11p
    """
}

process displayResult {
    input:
    file accessions

    output:
    stdout

    """
    echo "$accessions"
    """
}

workflow {
    accessions_p = fetchRunAccession(params.datalist)
    result = displayResult(accessions_p)
    result.view { it }
}

我得到这个错误:

Error executing process > 'fetchRunAccession'

Caused by:
  Missing output file(s) `accessions` expected by process `fetchRunAccession

如果我 运行 只是第一个过程,它运行良好并按预期打印 10 行。第二个过程只是实际 fromSRA 实现的占位符,但我无法将第一个过程的输出用作第二个过程的输入。我是 Nextflow 的新手,我的代码可能有一些愚蠢的错误。我将不胜感激在这件事上的任何帮助。

fromSRA function is actually a factory method. It requires either a project/study id, or one or more accession numbers, which must be specified as a list. A channel emitting accession numbers (like in your example code) will not work here. Also, it would be better to avoid spawning a separate job/process just to parse a small CSV file. Instead, just let your main Nextflow process do this. There's lots of ways to do this, but for CSV input I find using Nextflow's CsvSplitter class 让这变得简单:

import nextflow.splitter.CsvSplitter

nextflow.enable.dsl=2


def fetchRunAccessions( tsv ) {

    def splitter = new CsvSplitter().options( header:true, sep:'\t' )
    def reader = new BufferedReader( new FileReader( tsv ) )

    splitter.parseHeader( reader )

    List<String> run_accessions = []
    Map<String,String> row

    while( row = splitter.fetchRecord( reader ) ) {

       run_accessions.add( row['run_accession'] )
    }

    return run_accessions
}


workflow {

    accessions = fetchRunAccessions( params.filereport )

    Channel
        .fromSRA( accessions )
        .view()
}

请注意,Nextflow 的 ENA 下载 URL 最近已更新。您需要最新版本的 Nextflow (21.07.0-edge) 才能轻松达到 运行:

NXF_VER=21.07.0-edge nextflow run test.nf --filereport filereport.tsv