如何收集 Nextflow 流程的有序输出?

How do I gather ordered outputs of a Nextflow process?

我想按照输入顺序从 Nextflow 流程收集结果。

我知道我可以简单地将所有渠道的值传递到所有流程。这将确保对一起传递给所有进程。但是,当您开始添加多个进程时,该解决方案效果不佳,因为它破坏了这些进程并行 运行 的能力。例如,在提供的示例代码中,如果您要添加一个 add_twenty 进程,然后收集来自 add_ten、add_twenty 和 vals2 的输出。

我尝试过的另一种可能的解决方案是为原始通道中的每个值添加一个键,这实际上将原始通道变成了字典(即哈希)。但我无法让它发挥作用。如有必要,我可以提供一个例子。

我创建了一个玩具示例,其中我创建了两个通道,将一个通道发送到一个进程,然后将处理后的输出和一个原始通道发送到一个新进程。

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)


process add_ten {
    input:
    val(vals1)

    output:
    val(new_int) into new_vals1

    exec:
    new_int = vals1 + 10
}

process pair {
    echo true

    input:
    val(new_vals1)
    val(vals2)

    script:
    """
    echo "${new_vals1}, ${vals2}"
    """
}

我希望看到的是个位数匹配的情况:

11, 1
12, 2
13, 3
14, 4
15, 5

即使这些线是混乱的,只要成对存在就可以了。例如,

14, 4
11, 1
13, 3
15, 5
12, 2

然而,我看到的是这样的:

15, 1
13, 2
11, 3
12, 4
14, 5

您可以使用元组和 nextflow 组合运算符来完成此操作:

https://www.nextflow.io/docs/latest/operator.html#combine

这是一个例子:

vals1 = Channel.from([1, 'the'], [2, 'brown'], [3, 'jumps'], [4, 'a'], [5, 'fox'])
vals2 = Channel.from([5,'.'], [4, 'lazy'], [3, 'over'], [2, 'fox'], [1, 'quick'])

vals1
  .combine(vals2, by: 0)
  .println()

当你运行这个时,使用选项-ansi-log false。 您的示例经过一些更改后如下所示:

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)

i=0; vals1.map{[i++, it]}.view().set{keyed_vals1}
j=0; vals2.map{[j++, it]}.view().set{keyed_vals2}

process add_ten {

  input: set val(key), val(vals1) from keyed_vals1
  output: set val(key), val(new_vals1) into new_vals1

  exec: new_vals1 = vals1 + 10
}

process pair {
  echo true
  tag "$key $one $two"

  input: set val(key), val(one), val(two) from new_vals1.combine(keyed_vals2, by: 0).view()

  script: "echo '${key} ${one} ${two}'"
}