snakemake 检查点并根据创建的输出文件创建扩展 list/wildcards
snakemake checkpoint and create a expand list/wildcards based on created output files
希望有人能指导我正确的方向。请参阅下面的小示例:
from glob import glob
rule all:
input:
expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])
checkpoint split_to_fasta:
input:
seqfile = "files/seqs.csv"#just an empty file in this example
output:
fastas=directory("output/fasta")
shell:
#A python script will create below files and I dont know them beforehand.
"mkdir -p {output.fastas} ; "
"echo test > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
"echo test > {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
"echo test > {output.fastas}/2_LEFT_sample_name_spam.fa ;"
"echo test > {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
"echo test > {output.fastas}/3_LEFT_sample_name_egg.fa ;"
"echo test > {output.fastas}/4_RIGHT_sample_name_ham.fa ;"
rule step2:
input:
fasta = "output/fasta/{fasta}.fa"
output:
step2 = "output/step2/{fasta}_step2.txt",
shell:
"cp {input.fasta} {output.step2}"
rule step3:
input:
file = rules.step2.output.step2
output:
step3 = "output/step3/{fasta}_step3.txt",
shell:
"cp {input.file} {output.step3}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
###dont know where to use this line correctly
###files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta)
def get_id_files(wildcards):
blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
return sorted(blast)
rule step4:
input:
step3files = aggregate_input,
idfiles = get_id_files
output:
step4 = "output/step4/{number}_step4.txt",
run:
shell("cat {input.idfiles} > {output.step4}")
因为规则,所有 snakemake 都知道如何“启动”管道。我硬编码了数字 1、2、3 和 4,但在实际情况下我事先不知道这些数字。
expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])
我想要的是根据 split_to_fasta、step2 或 step3 的输出文件名获取这些数字。然后将其用作通配符的目标。 (我可以很容易地用 glob 和 split 得到数字)
我想像 def get_id_files
那样使用通配符,因为我想并行执行下一步。也就是说,下一步需要走以下几组文件:
[1_LEFT_sample_name_foo.fa, 1_RIGHT_sample_name_foo.fa]
[2_LEFT_sample_name_spam.fa, 2_RIGHT_sample_name_bla.fa]
[3_LEFT_sample_name_egg.fa]
[4_RIGHT_sample_name_ham.fa]
A 可能需要第二个检查点,但不确定如何实施。
编辑(解决方案):
我已经担心我的问题不是很清楚所以我又举了一个例子,见下文。此管道生成一些假文件(第 3 步结束)。从这一点开始,我想继续并并行处理具有相同 ID 的所有文件。 id 是文件名开头的数字。我可以制作第二个管道,从第 4 步“开始”并依次执行它们,但这听起来像是不好的做法。我想我需要为下一条规则(第 4 步)定义一个目标,但不知道如何根据这种情况做到这一点。定义目标本身的代码类似于:
files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
ids = list(set(files))
expand("output/step4/{number}_step4.txt", number=ids)
第二个例子(编辑为解决方案):
from glob import glob
def aggregate_input(wildcards):
checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta/"+ f"*.fa") ]
return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta) + expand("output/step4/{number}_step4.txt", number=ids)
rule all:
input:
aggregate_input,
checkpoint split_to_fasta:
input:
seqfile = "files/seqs.csv"
output:
fastas=directory("output/fasta")
shell:
#A python script will create below files and I dont know them beforehand.
#I could get the numbers if needed
"mkdir -p {output.fastas} ; "
"echo test1 > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
"echo test2 > {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
"echo test3 > {output.fastas}/2_LEFT_sample_name_spam.fa ;"
"echo test4 > {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
"echo test5 > {output.fastas}/3_LEFT_sample_name_egg.fa ;"
"echo test6 > {output.fastas}/4_RIGHT_sample_name_ham.fa ;"
rule step2:
input:
fasta = "output/fasta/{fasta}.fa"
output:
step2 = "output/step2/{fasta}_step2.txt",
shell:
"cp {input.fasta} {output.step2}"
rule step3:
input:
file = rules.step2.output.step2
output:
step3 = "output/step3/{fasta}_step3.txt",
shell:
"cp {input.file} {output.step3}"
def get_id_files(wildcards):
#blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
return blast
rule step4:
input:
idfiles = get_id_files
output:
step4 = "output/step4/{number}_step4.txt",
run:
shell("cat {input.idfiles} > {output.step4}")
如果我对问题的理解正确,则应更改以下行:
ids = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
目前 step3
中的文件是 glob
-bing(我认为这些文件尚不存在)。相反,glob
的正确内容是 rule split_to_fasta
的输出,所以像这样:
ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta*.fa") ]
后来要用这些ids
提取出相关的通配符在expand("output/step3/{fasta}_step3.txt", ...)
.
中使用
抱歉,这不是一个功能示例,但原始代码有点难以阅读。
将第二个示例中 get_id_functions 中的爆炸线替换为
blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
这是我理解检查点的方式,当规则(比如规则 a
)的输入是检查点时,a
上游的任何东西都被 DAG 的第一次评估阻止,在检查点之后已成功执行。第二轮评估将从了解检查点的输出开始。
所以在你的情况下,将检查点放在规则 all
中会在第一次评估时隐藏 step2/3/4(因为这些步骤在 all
的上游)。然后执行检查点,然后进行第二次评估。此时,您正在评估一个知道检查点所有输出的新工作流,因此您可以 1. 推断 id 2. 根据 split_to_fasta
输出推断相应的 step3
输出。
第一次评估:规则 all
-> 检查点 split_to_fasta
(待定)
第二次评估(split_to_fasta
已执行):规则 all
-> 检查点 split_to_fasta
-> 规则 step_4
->规则 step_3
-> 规则 step_2
get_id_files
发生在 step_4
,其中 step_3
还没有被执行,这就是为什么你需要根据 split_to_fasta
的输出来推断而不是直接找到step 3
的输出
希望有人能指导我正确的方向。请参阅下面的小示例:
from glob import glob
rule all:
input:
expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])
checkpoint split_to_fasta:
input:
seqfile = "files/seqs.csv"#just an empty file in this example
output:
fastas=directory("output/fasta")
shell:
#A python script will create below files and I dont know them beforehand.
"mkdir -p {output.fastas} ; "
"echo test > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
"echo test > {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
"echo test > {output.fastas}/2_LEFT_sample_name_spam.fa ;"
"echo test > {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
"echo test > {output.fastas}/3_LEFT_sample_name_egg.fa ;"
"echo test > {output.fastas}/4_RIGHT_sample_name_ham.fa ;"
rule step2:
input:
fasta = "output/fasta/{fasta}.fa"
output:
step2 = "output/step2/{fasta}_step2.txt",
shell:
"cp {input.fasta} {output.step2}"
rule step3:
input:
file = rules.step2.output.step2
output:
step3 = "output/step3/{fasta}_step3.txt",
shell:
"cp {input.file} {output.step3}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
###dont know where to use this line correctly
###files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta)
def get_id_files(wildcards):
blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
return sorted(blast)
rule step4:
input:
step3files = aggregate_input,
idfiles = get_id_files
output:
step4 = "output/step4/{number}_step4.txt",
run:
shell("cat {input.idfiles} > {output.step4}")
因为规则,所有 snakemake 都知道如何“启动”管道。我硬编码了数字 1、2、3 和 4,但在实际情况下我事先不知道这些数字。
expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])
我想要的是根据 split_to_fasta、step2 或 step3 的输出文件名获取这些数字。然后将其用作通配符的目标。 (我可以很容易地用 glob 和 split 得到数字)
我想像 def get_id_files
那样使用通配符,因为我想并行执行下一步。也就是说,下一步需要走以下几组文件:
[1_LEFT_sample_name_foo.fa, 1_RIGHT_sample_name_foo.fa]
[2_LEFT_sample_name_spam.fa, 2_RIGHT_sample_name_bla.fa]
[3_LEFT_sample_name_egg.fa]
[4_RIGHT_sample_name_ham.fa]
A 可能需要第二个检查点,但不确定如何实施。
编辑(解决方案):
我已经担心我的问题不是很清楚所以我又举了一个例子,见下文。此管道生成一些假文件(第 3 步结束)。从这一点开始,我想继续并并行处理具有相同 ID 的所有文件。 id 是文件名开头的数字。我可以制作第二个管道,从第 4 步“开始”并依次执行它们,但这听起来像是不好的做法。我想我需要为下一条规则(第 4 步)定义一个目标,但不知道如何根据这种情况做到这一点。定义目标本身的代码类似于:
files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
ids = list(set(files))
expand("output/step4/{number}_step4.txt", number=ids)
第二个例子(编辑为解决方案):
from glob import glob
def aggregate_input(wildcards):
checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta/"+ f"*.fa") ]
return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta) + expand("output/step4/{number}_step4.txt", number=ids)
rule all:
input:
aggregate_input,
checkpoint split_to_fasta:
input:
seqfile = "files/seqs.csv"
output:
fastas=directory("output/fasta")
shell:
#A python script will create below files and I dont know them beforehand.
#I could get the numbers if needed
"mkdir -p {output.fastas} ; "
"echo test1 > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
"echo test2 > {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
"echo test3 > {output.fastas}/2_LEFT_sample_name_spam.fa ;"
"echo test4 > {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
"echo test5 > {output.fastas}/3_LEFT_sample_name_egg.fa ;"
"echo test6 > {output.fastas}/4_RIGHT_sample_name_ham.fa ;"
rule step2:
input:
fasta = "output/fasta/{fasta}.fa"
output:
step2 = "output/step2/{fasta}_step2.txt",
shell:
"cp {input.fasta} {output.step2}"
rule step3:
input:
file = rules.step2.output.step2
output:
step3 = "output/step3/{fasta}_step3.txt",
shell:
"cp {input.file} {output.step3}"
def get_id_files(wildcards):
#blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
return blast
rule step4:
input:
idfiles = get_id_files
output:
step4 = "output/step4/{number}_step4.txt",
run:
shell("cat {input.idfiles} > {output.step4}")
如果我对问题的理解正确,则应更改以下行:
ids = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
目前 step3
中的文件是 glob
-bing(我认为这些文件尚不存在)。相反,glob
的正确内容是 rule split_to_fasta
的输出,所以像这样:
ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta*.fa") ]
后来要用这些ids
提取出相关的通配符在expand("output/step3/{fasta}_step3.txt", ...)
.
抱歉,这不是一个功能示例,但原始代码有点难以阅读。
将第二个示例中 get_id_functions 中的爆炸线替换为
blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
这是我理解检查点的方式,当规则(比如规则 a
)的输入是检查点时,a
上游的任何东西都被 DAG 的第一次评估阻止,在检查点之后已成功执行。第二轮评估将从了解检查点的输出开始。
所以在你的情况下,将检查点放在规则 all
中会在第一次评估时隐藏 step2/3/4(因为这些步骤在 all
的上游)。然后执行检查点,然后进行第二次评估。此时,您正在评估一个知道检查点所有输出的新工作流,因此您可以 1. 推断 id 2. 根据 split_to_fasta
输出推断相应的 step3
输出。
第一次评估:规则 all
-> 检查点 split_to_fasta
(待定)
第二次评估(split_to_fasta
已执行):规则 all
-> 检查点 split_to_fasta
-> 规则 step_4
->规则 step_3
-> 规则 step_2
get_id_files
发生在 step_4
,其中 step_3
还没有被执行,这就是为什么你需要根据 split_to_fasta
的输出来推断而不是直接找到step 3