是否可以在单个 nextflow 流程中对输入进行输出反馈?

Is it possible to have an output feedback to an input in a single nextflow process?

我正在尝试在我的 nextflow 脚本中创建一个简单的反馈循环。我收到一条奇怪的错误消息,我不知道如何调试。我的尝试是模仿 here 描述的 NextFlow 设计模式。我需要从 python3 脚本计算一个值,该脚本对图像进行操作,但将该值传递给脚本的后续执行。在这个阶段,我只想通过添加数字来获得正确的结构,但我还不能让它工作。

我的脚本


feedback_ch = Channel.create()
input_ch = Channel.from(feedback_ch)

process test {

    echo true 

    input:
        val chan_a from Channel.from(1,2,3)
        val feedback_val from input_ch
 
    output:
        stdout output_val into feedback_ch

    shell:
    '''
    #!/usr/bin/python3

    new_val = !{chan_a} + !{feedback_val}
    print(new_val)

    
    '''
}

我收到的错误信息

Error executing process > 'test (1)'

Caused by:
  Process `test (1)` terminated with an error exit status (1)

Command executed:

  #!/usr/bin/python3
  
  new_val = 1 + DataflowQueue(queue=[])
  print(new_val)

Command exit status:
  1

Command output:
  (empty)

Command error:
  Traceback (most recent call last):
    File ".command.sh", line 3, in <module>
      new_val = 1 + DataflowQueue(queue=[])
  NameError: name 'DataflowQueue' is not defined

Work dir:
executor >  local (1)
[cd/67768e] process > test (1) [100%] 1 of 1, failed: 1 ✘
Error executing process > 'test (1)'

Caused by:
  Process `test (1)` terminated with an error exit status (1)

Command executed:

  #!/usr/bin/python3
  
  new_val = 1 + DataflowQueue(queue=[])
  print(new_val)

Command exit status:
  1

Command output:
  (empty)

Command error:
  Traceback (most recent call last):
    File ".command.sh", line 3, in <module>
      new_val = 1 + DataflowQueue(queue=[])
  NameError: name 'DataflowQueue' is not defined

Work dir:
  /home/cv_proj/work/cd/67768e706f50d7675ae93645a0ce6e

Tip: you can replicate the issue by changing to the process work dir and entering the command `bash .command.run`

有人有什么想法吗?

你遇到的问题是,你正在用 input_ch 传递空的 DataflowQueue 对象。 Nextflow 尝试执行它,因此它将您的 python 代码替换为变量,导致:

  #!/usr/bin/python3
  
  new_val = 1 + DataflowQueue(queue=[])
  print(new_val)

什么是废话(你想要一些数字而不是 DataflowQueue(queue=[]),不是吗?)。
第二个问题是,您没有混合渠道,在这种模式中似乎很重要。无论如何,我修复了它,以获得概念证明,工作解决方案:

condition = { it.trim().toInteger() > 10 }  // As your output is stdout, you need to trim() to get rid of newline. Then cast to Integer to compare.
feedback_ch = Channel.create()
input_ch = Channel.from(1,2,3).mix( feedback_ch.until(condition) )  // Mixing channel, so we have feedback

process test {

    input:
    val chan_a from input_ch

    output:
    stdout output_val into feedback_ch

    shell:
    var output_val_trimmed = chan_a.toString().trim()
    // I am using double quotes, so nextflow interpolates variable above.
    """
    #!/usr/bin/python3

    new_val = ${output_val_trimmed} + ${output_val_trimmed}
    print(new_val)
    """

}

我希望,它至少能让你走上正轨 :)