snakemake 检查点并根据创建的输出文件创建扩展 list/wildcards

snakemake checkpoint and create a expand list/wildcards based on created output files

希望有人能指导我正确的方向。请参阅下面的小示例:

from glob import glob

rule all:
    input:
        expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])

checkpoint split_to_fasta:
    input:
        seqfile = "files/seqs.csv"#just an empty file in this example
    output:
        fastas=directory("output/fasta")
    shell:
        #A python script will create below files and I dont know them beforehand.
        "mkdir -p {output.fastas} ; "
        "echo test > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
        "echo test >  {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
        "echo test >  {output.fastas}/2_LEFT_sample_name_spam.fa ;"
        "echo test >  {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
        "echo test >  {output.fastas}/3_LEFT_sample_name_egg.fa ;"
        "echo test >  {output.fastas}/4_RIGHT_sample_name_ham.fa ;"

rule step2:
    input:
        fasta = "output/fasta/{fasta}.fa"
    output:
        step2 = "output/step2/{fasta}_step2.txt",
    shell:
        "cp {input.fasta} {output.step2}"

rule step3:
    input:
        file = rules.step2.output.step2
    output:
        step3 = "output/step3/{fasta}_step3.txt",
    shell:
        "cp {input.file} {output.step3}"

def aggregate_input(wildcards):
    checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
    ###dont know where to use this line correctly
    ###files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
    return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta)

def get_id_files(wildcards):
    blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
    return sorted(blast)

rule step4:
    input:
        step3files = aggregate_input,
        idfiles = get_id_files
    output:
        step4 = "output/step4/{number}_step4.txt",
    run:
        shell("cat {input.idfiles} > {output.step4}")

因为规则,所有 snakemake 都知道如何“启动”管道。我硬编码了数字 1、2、3 和 4,但在实际情况下我事先不知道这些数字。

expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])

我想要的是根据 split_to_fasta、step2 或 step3 的输出文件名获取这些数字。然后将其用作通配符的目标。 (我可以很容易地用 glob 和 split 得到数字)

我想像 def get_id_files 那样使用通配符,因为我想并行执行下一步。也就是说,下一步需要走以下几组文件:

[1_LEFT_sample_name_foo.fa, 1_RIGHT_sample_name_foo.fa]
[2_LEFT_sample_name_spam.fa, 2_RIGHT_sample_name_bla.fa]
[3_LEFT_sample_name_egg.fa]
[4_RIGHT_sample_name_ham.fa]

A 可能需要第二个检查点,但不确定如何实施。

编辑(解决方案):

我已经担心我的问题不是很清楚所以我又举了一个例子,见下文。此管道生成一些假文件(第 3 步结束)。从这一点开始,我想继续并并行处理具有相同 ID 的所有文件。 id 是文件名开头的数字。我可以制作第二个管道,从第 4 步“开始”并依次执行它们,但这听起来像是不好的做法。我想我需要为下一条规则(第 4 步)定义一个目标,但不知道如何根据这种情况做到这一点。定义目标本身的代码类似于:

    files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
    ids = list(set(files))
    expand("output/step4/{number}_step4.txt", number=ids)

第二个例子(编辑为解决方案):

from glob import glob

def aggregate_input(wildcards):
    checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
    ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta/"+ f"*.fa") ]
    return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta) + expand("output/step4/{number}_step4.txt", number=ids)

rule all:
    input:
        aggregate_input,

checkpoint split_to_fasta:
    input:
        seqfile = "files/seqs.csv"
    output:
        fastas=directory("output/fasta")
    shell:
        #A python script will create below files and I dont know them beforehand.
        #I could get the numbers if needed
        "mkdir -p {output.fastas} ; "
        "echo test1 > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
        "echo test2 >  {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
        "echo test3 >  {output.fastas}/2_LEFT_sample_name_spam.fa ;"
        "echo test4 >  {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
        "echo test5 >  {output.fastas}/3_LEFT_sample_name_egg.fa ;"
        "echo test6 >  {output.fastas}/4_RIGHT_sample_name_ham.fa ;"

rule step2:
    input:
        fasta = "output/fasta/{fasta}.fa"
    output:
        step2 = "output/step2/{fasta}_step2.txt",
    shell:
        "cp {input.fasta} {output.step2}"

rule step3:
    input:
        file = rules.step2.output.step2
    output:
        step3 = "output/step3/{fasta}_step3.txt",
    shell:
        "cp {input.file} {output.step3}"

def get_id_files(wildcards):
    #blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
    blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
    return blast

rule step4:
    input:
        idfiles = get_id_files
    output:
        step4 = "output/step4/{number}_step4.txt",
    run:
        shell("cat {input.idfiles} > {output.step4}")

如果我对问题的理解正确,则应更改以下行:

ids = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]

目前 step3 中的文件是 glob-bing(我认为这些文件尚不存在)。相反,glob 的正确内容是 rule split_to_fasta 的输出,所以像这样:

ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta*.fa") ]

后来要用这些ids提取出相关的通配符在expand("output/step3/{fasta}_step3.txt", ...).

中使用

抱歉,这不是一个功能示例,但原始代码有点难以阅读。

将第二个示例中 get_id_functions 中的爆炸线替换为

blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)

这是我理解检查点的方式,当规则(比如规则 a)的输入是检查点时,a 上游的任何东西都被 DAG 的第一次评估阻止,在检查点之后已成功执行。第二轮评估将从了解检查点的输出开始。

所以在你的情况下,将检查点放在规则 all 中会在第一次评估时隐藏 step2/3/4(因为这些步骤在 all 的上游)。然后执行检查点,然后进行第二次评估。此时,您正在评估一个知道检查点所有输出的新工作流,因此您可以 1. 推断 id 2. 根据 split_to_fasta 输出推断相应的 step3 输出。

第一次评估:规则 all -> 检查点 split_to_fasta(待定)

第二次评估(split_to_fasta 已执行):规则 all -> 检查点 split_to_fasta -> 规则 step_4 ->规则 step_3 -> 规则 step_2

get_id_files 发生在 step_4,其中 step_3 还没有被执行,这就是为什么你需要根据 split_to_fasta 的输出来推断而不是直接找到step 3

的输出