在脚本之外操作 Nextflow 变量
Manipulate Nextflow variables outside of scripts
我有一个过程iterate_list。 Process iterate_list 获取一个列表并对列表中的每个项目执行一些操作。当 运行ning 脚本时,它需要两个输入。它需要处理的列表和项目(它作为消费者从 rabbitmq 队列中获取)
目前,我为整个列表提供了一个 python 脚本,它遍历每个进行处理(作为一大块)并在完成后 returns。这很好,但是,如果系统重新启动,它将重新开始。
我想知道,我怎样才能让我的 python 脚本每次处理单个项目时,它 returns 项目,我从列表中删除它,然后传入进程的新列表。因此,在系统 restart/crash 的情况下,nextflow 知道它在哪里停止并可以从那里继续。
import groovy.json.JsonSlurper
def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = []
items = Channel.from(analysis_config.items.keySet())
for (String item : items) {
list_of_items_to_process << item
}
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
"""
}
process signal_completion{
echo true
input:
val typing_cur
script:
"""
echo "all done!"
"""
}
基本上,进程 "iterate_list" 从消息代理的队列中取出一个 "item"。进程 iterate_list 应该类似于:
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
list_of_items_to_process.remove(<output from python script>)
"""
}
因此,对于每一个,它都会删除 运行,删除它刚刚处理的项目,然后重新启动一个新列表。
initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.
看起来您真正想做的是从 Nextflow 进程中操纵全局 ArrayList
。 AFAIK,没有办法完全做到这一点。这就是 channels 的用途。
不清楚您是否真的需要从要处理的项目列表中删除任何项目。 Nextflow 已经可以使用 -resume
选项使用缓存的结果。那么为什么不只传入完整列表和单个项目进行处理呢?
items = Channel.from(['foo', 'bar', 'baz'])
items.into {
items_ch1
items_ch2
}
process iterate_list{
input:
val item from items_ch1
val list_of_items_to_process from items_ch2.collect()
"""
python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
"""
}
我只能猜测您的 Python 脚本如何使用它的参数,但是如果您要处理的项目列表只是一个占位符,那么您甚至可以输入一个项目的单个元素列表来进程:
items = Channel.from(['foo', 'bar', 'baz'])
process iterate_list{
input:
val item from items
"""
python3.7 process_list_items.py "${item}" '[${item}]'
"""
}
我有一个过程iterate_list。 Process iterate_list 获取一个列表并对列表中的每个项目执行一些操作。当 运行ning 脚本时,它需要两个输入。它需要处理的列表和项目(它作为消费者从 rabbitmq 队列中获取)
目前,我为整个列表提供了一个 python 脚本,它遍历每个进行处理(作为一大块)并在完成后 returns。这很好,但是,如果系统重新启动,它将重新开始。
我想知道,我怎样才能让我的 python 脚本每次处理单个项目时,它 returns 项目,我从列表中删除它,然后传入进程的新列表。因此,在系统 restart/crash 的情况下,nextflow 知道它在哪里停止并可以从那里继续。
import groovy.json.JsonSlurper
def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = []
items = Channel.from(analysis_config.items.keySet())
for (String item : items) {
list_of_items_to_process << item
}
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
"""
}
process signal_completion{
echo true
input:
val typing_cur
script:
"""
echo "all done!"
"""
}
基本上,进程 "iterate_list" 从消息代理的队列中取出一个 "item"。进程 iterate_list 应该类似于:
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
list_of_items_to_process.remove(<output from python script>)
"""
}
因此,对于每一个,它都会删除 运行,删除它刚刚处理的项目,然后重新启动一个新列表。
initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.
看起来您真正想做的是从 Nextflow 进程中操纵全局 ArrayList
。 AFAIK,没有办法完全做到这一点。这就是 channels 的用途。
不清楚您是否真的需要从要处理的项目列表中删除任何项目。 Nextflow 已经可以使用 -resume
选项使用缓存的结果。那么为什么不只传入完整列表和单个项目进行处理呢?
items = Channel.from(['foo', 'bar', 'baz'])
items.into {
items_ch1
items_ch2
}
process iterate_list{
input:
val item from items_ch1
val list_of_items_to_process from items_ch2.collect()
"""
python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
"""
}
我只能猜测您的 Python 脚本如何使用它的参数,但是如果您要处理的项目列表只是一个占位符,那么您甚至可以输入一个项目的单个元素列表来进程:
items = Channel.from(['foo', 'bar', 'baz'])
process iterate_list{
input:
val item from items
"""
python3.7 process_list_items.py "${item}" '[${item}]'
"""
}