Snakemake 让检查点和聚合函数工作

Snakemake Getting Checkpoint and Aggregate Function to Work

我在使用 snakemake 聚合命令时遇到问题。我希望获取给定的 GTF 文件,在 GTF 中寻找单独的区域,如果找到,将这些区域写入单独的文件。因此,我不确定每个输入 GTF 文件将创建的输出 GTF 文件的数量。为了解决这个问题,我正在尝试使用 snakemake 检查点。

为此,我编写了一个名为 collapse_gtf_file.py 的简短脚本,它只接收一个 GTF 文件,并生成与找到的各个区域的数量相对应的 N 个文件。因此,如果给定具有三个区域的文件 test_regions.gtf,它将分别生成 test_regions_1.gtf, test_regions_2.gtf test_regions_3.gtf

上述分离后,所有GTF文件都应该转换为fasta文件,并聚合。

但是,我无法让我的检查点命令起作用。我可以使示例案例正常工作,但是当我尝试围绕此检查点构建更大的管道时,它会中断。

到目前为止,我已经尝试按照此处找到的检查点教程进行操作 https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files

sample=config["samples"]
reference=config["reference"]

rule all:
    input:
    ¦   expand("string_tie_assembly/{sample}.gtf", sample=sample),
    ¦                                                                                                 expand("string_tie_assembly_merged/merged_{sample}.gtf",sample=sample),
    ¦   #expand("split_gtf_file/{sample}", sample=sample),
    ¦   #expand("lncRNA_fasta/{sample}.fa", sample=sample),
    ¦   "combined_fasta/all_fastas_combined.fa",
    ¦   "run_CPC2/cpc_calc_output.txt"

rule samtools_sort:
    input:
    ¦   "mapped_reads/{sample}.bam"
    output:
    ¦   "sorted_reads/{sample}.sorted.bam"
    shell:
    ¦   "samtools sort -T sorted_reads/{wildcards.sample} {input} > {output}"

rule samtools_index:
    input:
        "sorted_reads/{sample}.sorted.bam"
    output:
        "sorted_reads/{sample}.sorted.bam.bai"
    shell:
        "samtools index {input}"

rule generate_fastq:
    input:
        "sorted_reads/{sample}.sorted.bam"
    output:
        "fastq_reads/{sample}.fastq"
    shell:
        "samtools fastq {input} > {output}"

rule string_tie_assembly:
    input:
        "sorted_reads/{sample}.sorted.bam"
    output:
        "string_tie_assembly/{sample}.gtf"
    shell:
        "stringtie {input} -f 0.0 -a 0 -m 50 -c 3.0 -f 0.0 -o {output}"

rule merge_gtf_file_features:
    input:
    ¦   "string_tie_assembly/{sample}.gtf"
    output:
    ¦   "string_tie_assembly_merged/merged_{sample}.gtf"
    shell:
    ¦   #prevents errors when there's no sequence
    ¦   """
    ¦   set +e
    ¦   stringtie --merge -o {output} -m 25 -c 3.0 {input}
    ¦   exitcode=$?
    ¦   if [ $exitcode -eq 1 ]
    ¦   then
    ¦   ¦   exit 0
    ¦   else
    ¦   ¦   exit 0
    ¦   fi
    ¦   """

#This is where the issue seems to arise from. Modeled after https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files 

checkpoint clustering:
    input:
    ¦   "string_tie_assembly_merged/merged_{sample}.gtf"
    output:
    ¦   clusters = directory("split_gtf_file/{sample}")
    shell:
    ¦   """
    ¦   mkdir -p split_gtf_file/{wildcards.sample} ;

    ¦   python collapse_gtf_file.py -gtf {input} -o split_gtf_file/{wildcards.sample}/{wildcards.sample}
    ¦   """

rule gtf_to_fasta:
    input:
    ¦   "split_gtf_file/{sample}/{sample}_{i}.gtf"
    output:
    ¦   "lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa"
    wildcard_constraints:
    ¦   i="\d+"
    shell:
    ¦   "gffread -w {output} -g {reference} {input}"


def aggregate_input(wildcards):
    checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
    x = expand("lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa",
    ¦   sample=wildcards.sample,
    ¦   i=glob_wildcards(os.path.join(checkpoint_output, "{i}.fa")).i)
    return x

rule combine_fasta_file:
    input:
    ¦   aggregate_input
    output:
    ¦   "combined_fasta/all_fastas_combined.fa"
    shell:
    ¦   "cat {input} > {output}"


Error Message:

InputFunctionException in combine_fasta_file:
WorkflowError: Missing wildcard values for sample
Wildcards:

在我看来,这表明我在上面的聚合命令中调用通配符的方式有问题,但我无法弄清楚是什么。任何指针将不胜感激。

函数 aggregate_input 需要变量 wildcards.sample,但您没有在 rule combine_fasta_file 中指定任何通配符。如果适用,您可能想要在该规则中指定通配符或重构函数以使用全局变量。

收集检查点输出的规则(combine_fasta_file,在你的例子中)需要包含检查点规则输出中存在的通配符(clustering,在你的例子中)。这是因为 Snakemake 查看您想要的输出,然后尝试通过您的规则向后工作到可用的输入文件。如果收集规则中的通配符不包括检查点规则的通配符,则引用的确切检查点可能不明确。

为了详细说明 Manavalan Gajapathy 对 Snakemake docs checkpoint example 的评论以及 pabster212 所需的多重聚合,我生成了一个最小示例来突出检查点和多重收集。

输入设置:

mkdir -p input/sampleA_run{1,2,3}
mkdir -p input/sampleB_run{4,5}
touch input/sampleA_run1/{apple,banana,cucumber,beet,basil}.txt
touch input/sampleA_run2/{alpha,beta,gamma}.txt
touch input/sampleA_run3/{red,blue,green,brown}.txt
touch input/sampleB_run4/{alice,betty,beth,barbara,christy}.txt
touch input/sampleB_run5/{alex,brad,bart,bennett,clive}.txt

通过检查点和多聚合的 Snakefile:

rule all:
    input:
        # Final
        expand("output/gathered_samples/{sample}.txt", sample = ["sampleA","sampleB"])

# My real-world analysis at this step (Nanopore guppy basecaller) 
# Takes as input a directory 
# With a inconsistent numbers of files and names for files (.fast5 in real analysis)
# Due to filtering, there are more inputs than outputs (.fastq in real analysis)
# And the output filenames are not predictably named or numbered
# Therefore I use a checkpoint to evaluate which output files have been created
# In this toy example, only files beginning with 'b' are maintained
checkpoint unpredictable_file_generator:
    input:
        "input/{sample}_{run_id}"
    output:
        directory("output/unpredictable_files/{sample}_{run_id}"),
    shell:
        """
        mkdir -p {output}

        for some_file in {input}/*
        do
            outfile=$(basename "$some_file")
            if [[ "$outfile" =~ ^b.* ]]
            then
                touch "{output}/$outfile"
            fi
        done
        """

# For my real-world analysis, this intermediate step 
# Represents generation of bams from fastqs
# No intermediate step is necessarily required following the checkpoint
# Just modify the input-gathering function below (gather_from_checkpoint) 
# To expand 'output/unpredictable_files' instead of 'output/intermediate_files'
rule intermediate_step:
    input:
        "output/unpredictable_files/{sample}_{run_id}/{file_id}.txt"
    output:
        "output/intermediate_files/{sample}_{run_id}/{file_id}.txt"
    shell:
        """
        cp {input} {output}
        """

# Function to gather all outputs from checkpoint rule
# The output of the rule referring back to the checkpoint (gather_one, below)
# Must contain the all wildcards in the checkpoint rule
# To gather further (e.g. gather by sample rather than by run_id)
# An additional gather rule is required
def gather_from_checkpoint(wildcards):
    checkpoint_output = checkpoints.unpredictable_file_generator.get(**wildcards).output[0]
    return expand("output/intermediate_files/{sample}_{run_id}/{file_id}.txt",
           sample=wildcards.sample,
           run_id=wildcards.run_id,
           file_id=glob_wildcards(os.path.join(checkpoint_output, "{file_id}.txt")).file_id)

# Gathers all files by run_id
# But each sample is still divided into runs
# For my real-world analysis, this could represent a 'samtools merge bam'
rule gather_one:
    input:
        gather_from_checkpoint
    output:
        "output/gathered_runs/{sample}_{run_id}.txt"
    shell:
        """
        # Generate a file for each run 
        # Containing the names of all files output by 'unknown_file_generator' 
        echo {input} | tr ' ' $'\n' > {output}
        """

# Gathers all previously-run_id-gathered files by sample
# For my real-world analysis, this could represent a second 'samtools merge bam'
# After which point the first merge could potentially be removed if it were marked 'temporary'
############
# Note wildcard restriction was required 
# To prevent the {run_id} match from descending into diretories
# And generating 'ChildIOException' errors
rule gather_two:
    input:
        lambda wildcards: expand("output/gathered_runs/{sample}_{run_id}.txt",
            sample = wildcards.sample,
            run_id = glob_wildcards("input/" + wildcards.sample + "_{run_id,[^/]+}/").run_id)
    output:
        "output/gathered_samples/{sample}.txt"
    shell:
        """
        # This output should contain 6 filenames for each sample
        # Each beginning with 'b'
        cat {input} > {output}
        """