如何从工作流中某个点创建的文件子集继续 snakemake 工作流? (试过检查点)

How can I continue a snakemake workflow from a subset of files created at some point in the workflow? (tried checkpoint)

在我的工作流程中的某个时刻,我选择了一部分文件。我想使用该文件子集继续我工作流程中的后续步骤:

rule all:
    input:
        "results/selected_files.tsv",

checkpoint select_by_size:
    input:
        "results/selected_seqs"
    output:
        directory("results/selected_seqs_by_size")
    shell:
        """
        mkdir -p {output[0]} 
        # The second -size is only for testing, remove it for a real run
        find {input} -size +302c -size -304c -exec cp {{}} {output[0]} \;
        """

def get_selected_files(wildcards):
    ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
    GENES,  = glob_wildcards(join(ck_output, "{gene}.fasta"))
    return expand(join(ck_output, "{GENE}.fasta"), GENE=GENES)

rule create_table_sel:
    input:
        get_selected_files
    output:
        "results/selected_files.tsv"
    run:
        with open(output[0], 'w') as fh:
            for f in input:
               print(Path(f).stem, f, file=fh, sep='\t')

规则 create_table_sel 创建一个 table 如下所示:

JLEOKLFN_00589  results/selected_seqs_by_size/JLEOKLFN_00589.fasta
JLEOKLFN_01812  results/selected_seqs_by_size/JLEOKLFN_01812.fasta

如果我尝试使用此 table 将单个文件放入以下规则,它会失败(将 "results/a3ms.txt" 添加到 rule all

def get_seqs(wildcards):
    df = pd.read_csv("results/selected_files.tsv", sep="\t", header=None, index_col=0)
    return df.loc[wildcards.gene].values[0]

rule hhblits_msa:
    input:
        get_seqs
        # "results/selected_seqs_by_size/{gene}.fasta"
    output:
        "results/hhblits_msa/{gene}.a3m"
    log:
        "logs/hhblits/{gene}_msa.log" 
    params:
        db=config["msa_db"],
        n_iter= 2
    threads: 4
    shell:
        """
        mkdir -p results/hhblits_msa
        hhblits -i {input[0]} -d {params.db} -oa3m {output[0]} -cpu {threads}  -n {params.n_iter} \
        &> {log}
        """

# The intention is to use the above rule as input
rule agg:
    input:
        "results/hhblits_msa/{gene}.a3m"
    output:
        "results/a3ms.txt"
    shell:
        "echo {input} >> {output}"
Wildcards in input files cannot be determined from output files:
'gene'

如果我使用类似 snakemake --cores 4 results/hhblits_msa/JLEOKLFN_00589.a3m 的东西,它运行没有问题,但我当然想使用这个工作流而不需要在命令行中指定输出文件。

如何将规则 hhblits_msa 添加到我的工作流中?我如何才能从同一工作流程中某个时间点创建的文件子集继续任何工作流程?

此外,某种程度上相关的是,是否可以将 checkpoints 对象用于 return 单个文件,而不仅仅是 return 所有带有 expand 的文件?我尝试了一些东西但失败了,因为无法获得通配符。这个想法是使用该单个文件作为任何规则的输入,例如 hhblits_msa.

已添加

问题是使用在 select_by_size 中创建的文件作为 hhblits_msa 的输入。鉴于这些文件是在相同的工作流程中创建的,而且我事先不知道它们,所以我不能将它们包含在 all 中。当我使用此工作流或 建议的工作流时,规则 hhblits_msa 没有被执行。

我不太确定你的工作流程设计,但我觉得你不需要 create_table_sel 规则。

您可能希望将输入函数移至规则 agg,同时将输入函数修改为 return agg 的预期输入,示例:

def get_selected_files(wildcards):
    ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
    GENES,  = glob_wildcards(join(ck_output, "{gene}.fasta"))
    return expand("results/hhblits_msa/{GENE}.a3m", GENE=GENES)

然后根据此更改,您的 hhblits_msa 规则应如下所示:

rule hhblits_msa:
    input:
        "results/selected_seqs_by_size/{gene}.fasta"
    output:
        "results/hhblits_msa/{gene}.a3m"
    ... # log, parameters and shell command

snakemake的执行过程是这样的:

1.agg -> 2.input 函数 get_selected_files -> 3.execute 检查点 -> 4. 执行 hhblits_msa -> 执行 agg