Snakemake:具有批量输入和相应输出的规则
Snakemake: a rule with batched inputs and corresponding outputs
我的工作流基本结构如下:
- 文件是从远程服务器下载的,
- 本地转换然后
- 已分析。
其中一项分析非常耗时,但如果一次 运行 对多个输入文件进行扩展,则它的扩展性很好。此规则的输出与作为批次分析的文件无关,只要它们都共享同一组设置即可。上游和下游规则对单个文件进行操作,因此从工作流的角度来看,此规则是异常值。哪些文件要 运行 可以提前告知,但理想情况下,如果某些输入在此过程中未能生成,规则应该是 运行 减少文件。
下面的例子说明了这个问题:
samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
"created/{sample}.txt"
output:
"analyzed/{sample}.txt"
params:
outdir = "analyzed/"
shell:
"""
sleep 1 # or longer
parallel md5sum {{}} \> {params.outdir}/{{/}} ::: {input}
"""
rule finalize:
input:
"analyzed/{sample}.txt"
output:
"done/{sample}.txt"
shell:
"touch {output}"
规则analyze
是根据groups
中的分配从多个输入生成多个输出文件的规则。规则 create
和 finalize
分别对上游和下游的单个文件进行操作。
有没有办法实现这样的逻辑?我想尽量避免拆分工作流程以适应这种不规则性。
注意:此问题与听起来相似的问题无关 。
如果我没理解错的话。规则 analyze
接受组 A 的输入文件 created/a.txt, created/b.txt, created/c.txt
并给出输出
analyzed/a.txt, analyzed/b.txt, analyzed/c.txt
。 B 组也是如此,所以规则 analyze
运行两次,其他所有运行 6 次。
如果是这样,我制定规则 analyze
输出一个虚拟文件,表示 A 组(或 B 等)中的文件已被分析。下游规则将输入这个虚拟文件并找到相应的 analyzed/{sample}.txt
available.
这是你的例子:
samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
# Map samples to groups by inverting dict groups
inv_groups = {}
for x in samples:
for k in groups:
if x in groups[k]:
inv_groups[x] = k
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
# Collect input for this group (A, B, etc)
grp= lambda wc: ["created/%s.txt" % x for x in groups[wc.group]]
output:
done = touch('created/{group}.done'),
shell:
"""
# Code that actually does the job...
for x in {input.grp}
do
sn=`basename $x .txt`
touch analyzed/$sn.txt
done
"""
rule finalize:
input:
# Get dummy file for this {sample}.
# If the dummy exists also the corresponding analyzed/{sample}.txt exists.
done = lambda wc: 'created/%s.done' % inv_groups[wc.sample],
output:
fout= "done/{sample}.txt"
params:
fin= "analyzed/{sample}.txt",
shell:
"cp {params.fin} {output.fout}"
我的工作流基本结构如下:
- 文件是从远程服务器下载的,
- 本地转换然后
- 已分析。
其中一项分析非常耗时,但如果一次 运行 对多个输入文件进行扩展,则它的扩展性很好。此规则的输出与作为批次分析的文件无关,只要它们都共享同一组设置即可。上游和下游规则对单个文件进行操作,因此从工作流的角度来看,此规则是异常值。哪些文件要 运行 可以提前告知,但理想情况下,如果某些输入在此过程中未能生成,规则应该是 运行 减少文件。
下面的例子说明了这个问题:
samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
"created/{sample}.txt"
output:
"analyzed/{sample}.txt"
params:
outdir = "analyzed/"
shell:
"""
sleep 1 # or longer
parallel md5sum {{}} \> {params.outdir}/{{/}} ::: {input}
"""
rule finalize:
input:
"analyzed/{sample}.txt"
output:
"done/{sample}.txt"
shell:
"touch {output}"
规则analyze
是根据groups
中的分配从多个输入生成多个输出文件的规则。规则 create
和 finalize
分别对上游和下游的单个文件进行操作。
有没有办法实现这样的逻辑?我想尽量避免拆分工作流程以适应这种不规则性。
注意:此问题与听起来相似的问题无关
如果我没理解错的话。规则 analyze
接受组 A 的输入文件 created/a.txt, created/b.txt, created/c.txt
并给出输出
analyzed/a.txt, analyzed/b.txt, analyzed/c.txt
。 B 组也是如此,所以规则 analyze
运行两次,其他所有运行 6 次。
如果是这样,我制定规则 analyze
输出一个虚拟文件,表示 A 组(或 B 等)中的文件已被分析。下游规则将输入这个虚拟文件并找到相应的 analyzed/{sample}.txt
available.
这是你的例子:
samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
# Map samples to groups by inverting dict groups
inv_groups = {}
for x in samples:
for k in groups:
if x in groups[k]:
inv_groups[x] = k
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
# Collect input for this group (A, B, etc)
grp= lambda wc: ["created/%s.txt" % x for x in groups[wc.group]]
output:
done = touch('created/{group}.done'),
shell:
"""
# Code that actually does the job...
for x in {input.grp}
do
sn=`basename $x .txt`
touch analyzed/$sn.txt
done
"""
rule finalize:
input:
# Get dummy file for this {sample}.
# If the dummy exists also the corresponding analyzed/{sample}.txt exists.
done = lambda wc: 'created/%s.done' % inv_groups[wc.sample],
output:
fout= "done/{sample}.txt"
params:
fin= "analyzed/{sample}.txt",
shell:
"cp {params.fin} {output.fout}"