Snakemake 让检查点和聚合函数工作
Snakemake Getting Checkpoint and Aggregate Function to Work
我在使用 snakemake 聚合命令时遇到问题。我希望获取给定的 GTF 文件,在 GTF 中寻找单独的区域,如果找到,将这些区域写入单独的文件。因此,我不确定每个输入 GTF 文件将创建的输出 GTF 文件的数量。为了解决这个问题,我正在尝试使用 snakemake 检查点。
为此,我编写了一个名为 collapse_gtf_file.py
的简短脚本,它只接收一个 GTF 文件,并生成与找到的各个区域的数量相对应的 N 个文件。因此,如果给定具有三个区域的文件 test_regions.gtf
,它将分别生成 test_regions_1.gtf, test_regions_2.gtf test_regions_3.gtf
。
上述分离后,所有GTF文件都应该转换为fasta文件,并聚合。
但是,我无法让我的检查点命令起作用。我可以使示例案例正常工作,但是当我尝试围绕此检查点构建更大的管道时,它会中断。
到目前为止,我已经尝试按照此处找到的检查点教程进行操作 https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files
sample=config["samples"]
reference=config["reference"]
rule all:
input:
¦ expand("string_tie_assembly/{sample}.gtf", sample=sample),
¦ expand("string_tie_assembly_merged/merged_{sample}.gtf",sample=sample),
¦ #expand("split_gtf_file/{sample}", sample=sample),
¦ #expand("lncRNA_fasta/{sample}.fa", sample=sample),
¦ "combined_fasta/all_fastas_combined.fa",
¦ "run_CPC2/cpc_calc_output.txt"
rule samtools_sort:
input:
¦ "mapped_reads/{sample}.bam"
output:
¦ "sorted_reads/{sample}.sorted.bam"
shell:
¦ "samtools sort -T sorted_reads/{wildcards.sample} {input} > {output}"
rule samtools_index:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"sorted_reads/{sample}.sorted.bam.bai"
shell:
"samtools index {input}"
rule generate_fastq:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"fastq_reads/{sample}.fastq"
shell:
"samtools fastq {input} > {output}"
rule string_tie_assembly:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"string_tie_assembly/{sample}.gtf"
shell:
"stringtie {input} -f 0.0 -a 0 -m 50 -c 3.0 -f 0.0 -o {output}"
rule merge_gtf_file_features:
input:
¦ "string_tie_assembly/{sample}.gtf"
output:
¦ "string_tie_assembly_merged/merged_{sample}.gtf"
shell:
¦ #prevents errors when there's no sequence
¦ """
¦ set +e
¦ stringtie --merge -o {output} -m 25 -c 3.0 {input}
¦ exitcode=$?
¦ if [ $exitcode -eq 1 ]
¦ then
¦ ¦ exit 0
¦ else
¦ ¦ exit 0
¦ fi
¦ """
#This is where the issue seems to arise from. Modeled after https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files
checkpoint clustering:
input:
¦ "string_tie_assembly_merged/merged_{sample}.gtf"
output:
¦ clusters = directory("split_gtf_file/{sample}")
shell:
¦ """
¦ mkdir -p split_gtf_file/{wildcards.sample} ;
¦ python collapse_gtf_file.py -gtf {input} -o split_gtf_file/{wildcards.sample}/{wildcards.sample}
¦ """
rule gtf_to_fasta:
input:
¦ "split_gtf_file/{sample}/{sample}_{i}.gtf"
output:
¦ "lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa"
wildcard_constraints:
¦ i="\d+"
shell:
¦ "gffread -w {output} -g {reference} {input}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
x = expand("lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa",
¦ sample=wildcards.sample,
¦ i=glob_wildcards(os.path.join(checkpoint_output, "{i}.fa")).i)
return x
rule combine_fasta_file:
input:
¦ aggregate_input
output:
¦ "combined_fasta/all_fastas_combined.fa"
shell:
¦ "cat {input} > {output}"
Error Message:
InputFunctionException in combine_fasta_file:
WorkflowError: Missing wildcard values for sample
Wildcards:
在我看来,这表明我在上面的聚合命令中调用通配符的方式有问题,但我无法弄清楚是什么。任何指针将不胜感激。
函数 aggregate_input
需要变量 wildcards.sample
,但您没有在 rule combine_fasta_file
中指定任何通配符。如果适用,您可能想要在该规则中指定通配符或重构函数以使用全局变量。
收集检查点输出的规则(combine_fasta_file
,在你的例子中)需要包含检查点规则输出中存在的通配符(clustering
,在你的例子中)。这是因为 Snakemake 查看您想要的输出,然后尝试通过您的规则向后工作到可用的输入文件。如果收集规则中的通配符不包括检查点规则的通配符,则引用的确切检查点可能不明确。
为了详细说明 Manavalan Gajapathy 对 Snakemake docs checkpoint example 的评论以及 pabster212 所需的多重聚合,我生成了一个最小示例来突出检查点和多重收集。
输入设置:
mkdir -p input/sampleA_run{1,2,3}
mkdir -p input/sampleB_run{4,5}
touch input/sampleA_run1/{apple,banana,cucumber,beet,basil}.txt
touch input/sampleA_run2/{alpha,beta,gamma}.txt
touch input/sampleA_run3/{red,blue,green,brown}.txt
touch input/sampleB_run4/{alice,betty,beth,barbara,christy}.txt
touch input/sampleB_run5/{alex,brad,bart,bennett,clive}.txt
通过检查点和多聚合的 Snakefile:
rule all:
input:
# Final
expand("output/gathered_samples/{sample}.txt", sample = ["sampleA","sampleB"])
# My real-world analysis at this step (Nanopore guppy basecaller)
# Takes as input a directory
# With a inconsistent numbers of files and names for files (.fast5 in real analysis)
# Due to filtering, there are more inputs than outputs (.fastq in real analysis)
# And the output filenames are not predictably named or numbered
# Therefore I use a checkpoint to evaluate which output files have been created
# In this toy example, only files beginning with 'b' are maintained
checkpoint unpredictable_file_generator:
input:
"input/{sample}_{run_id}"
output:
directory("output/unpredictable_files/{sample}_{run_id}"),
shell:
"""
mkdir -p {output}
for some_file in {input}/*
do
outfile=$(basename "$some_file")
if [[ "$outfile" =~ ^b.* ]]
then
touch "{output}/$outfile"
fi
done
"""
# For my real-world analysis, this intermediate step
# Represents generation of bams from fastqs
# No intermediate step is necessarily required following the checkpoint
# Just modify the input-gathering function below (gather_from_checkpoint)
# To expand 'output/unpredictable_files' instead of 'output/intermediate_files'
rule intermediate_step:
input:
"output/unpredictable_files/{sample}_{run_id}/{file_id}.txt"
output:
"output/intermediate_files/{sample}_{run_id}/{file_id}.txt"
shell:
"""
cp {input} {output}
"""
# Function to gather all outputs from checkpoint rule
# The output of the rule referring back to the checkpoint (gather_one, below)
# Must contain the all wildcards in the checkpoint rule
# To gather further (e.g. gather by sample rather than by run_id)
# An additional gather rule is required
def gather_from_checkpoint(wildcards):
checkpoint_output = checkpoints.unpredictable_file_generator.get(**wildcards).output[0]
return expand("output/intermediate_files/{sample}_{run_id}/{file_id}.txt",
sample=wildcards.sample,
run_id=wildcards.run_id,
file_id=glob_wildcards(os.path.join(checkpoint_output, "{file_id}.txt")).file_id)
# Gathers all files by run_id
# But each sample is still divided into runs
# For my real-world analysis, this could represent a 'samtools merge bam'
rule gather_one:
input:
gather_from_checkpoint
output:
"output/gathered_runs/{sample}_{run_id}.txt"
shell:
"""
# Generate a file for each run
# Containing the names of all files output by 'unknown_file_generator'
echo {input} | tr ' ' $'\n' > {output}
"""
# Gathers all previously-run_id-gathered files by sample
# For my real-world analysis, this could represent a second 'samtools merge bam'
# After which point the first merge could potentially be removed if it were marked 'temporary'
############
# Note wildcard restriction was required
# To prevent the {run_id} match from descending into diretories
# And generating 'ChildIOException' errors
rule gather_two:
input:
lambda wildcards: expand("output/gathered_runs/{sample}_{run_id}.txt",
sample = wildcards.sample,
run_id = glob_wildcards("input/" + wildcards.sample + "_{run_id,[^/]+}/").run_id)
output:
"output/gathered_samples/{sample}.txt"
shell:
"""
# This output should contain 6 filenames for each sample
# Each beginning with 'b'
cat {input} > {output}
"""
我在使用 snakemake 聚合命令时遇到问题。我希望获取给定的 GTF 文件,在 GTF 中寻找单独的区域,如果找到,将这些区域写入单独的文件。因此,我不确定每个输入 GTF 文件将创建的输出 GTF 文件的数量。为了解决这个问题,我正在尝试使用 snakemake 检查点。
为此,我编写了一个名为 collapse_gtf_file.py
的简短脚本,它只接收一个 GTF 文件,并生成与找到的各个区域的数量相对应的 N 个文件。因此,如果给定具有三个区域的文件 test_regions.gtf
,它将分别生成 test_regions_1.gtf, test_regions_2.gtf test_regions_3.gtf
。
上述分离后,所有GTF文件都应该转换为fasta文件,并聚合。
但是,我无法让我的检查点命令起作用。我可以使示例案例正常工作,但是当我尝试围绕此检查点构建更大的管道时,它会中断。
到目前为止,我已经尝试按照此处找到的检查点教程进行操作 https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files
sample=config["samples"]
reference=config["reference"]
rule all:
input:
¦ expand("string_tie_assembly/{sample}.gtf", sample=sample),
¦ expand("string_tie_assembly_merged/merged_{sample}.gtf",sample=sample),
¦ #expand("split_gtf_file/{sample}", sample=sample),
¦ #expand("lncRNA_fasta/{sample}.fa", sample=sample),
¦ "combined_fasta/all_fastas_combined.fa",
¦ "run_CPC2/cpc_calc_output.txt"
rule samtools_sort:
input:
¦ "mapped_reads/{sample}.bam"
output:
¦ "sorted_reads/{sample}.sorted.bam"
shell:
¦ "samtools sort -T sorted_reads/{wildcards.sample} {input} > {output}"
rule samtools_index:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"sorted_reads/{sample}.sorted.bam.bai"
shell:
"samtools index {input}"
rule generate_fastq:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"fastq_reads/{sample}.fastq"
shell:
"samtools fastq {input} > {output}"
rule string_tie_assembly:
input:
"sorted_reads/{sample}.sorted.bam"
output:
"string_tie_assembly/{sample}.gtf"
shell:
"stringtie {input} -f 0.0 -a 0 -m 50 -c 3.0 -f 0.0 -o {output}"
rule merge_gtf_file_features:
input:
¦ "string_tie_assembly/{sample}.gtf"
output:
¦ "string_tie_assembly_merged/merged_{sample}.gtf"
shell:
¦ #prevents errors when there's no sequence
¦ """
¦ set +e
¦ stringtie --merge -o {output} -m 25 -c 3.0 {input}
¦ exitcode=$?
¦ if [ $exitcode -eq 1 ]
¦ then
¦ ¦ exit 0
¦ else
¦ ¦ exit 0
¦ fi
¦ """
#This is where the issue seems to arise from. Modeled after https://snakemake.readthedocs.io/en/stable/snakefile/rules.html#dynamic-files
checkpoint clustering:
input:
¦ "string_tie_assembly_merged/merged_{sample}.gtf"
output:
¦ clusters = directory("split_gtf_file/{sample}")
shell:
¦ """
¦ mkdir -p split_gtf_file/{wildcards.sample} ;
¦ python collapse_gtf_file.py -gtf {input} -o split_gtf_file/{wildcards.sample}/{wildcards.sample}
¦ """
rule gtf_to_fasta:
input:
¦ "split_gtf_file/{sample}/{sample}_{i}.gtf"
output:
¦ "lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa"
wildcard_constraints:
¦ i="\d+"
shell:
¦ "gffread -w {output} -g {reference} {input}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
x = expand("lncRNA_fasta/{sample}/canidate_{sample}_{i}.fa",
¦ sample=wildcards.sample,
¦ i=glob_wildcards(os.path.join(checkpoint_output, "{i}.fa")).i)
return x
rule combine_fasta_file:
input:
¦ aggregate_input
output:
¦ "combined_fasta/all_fastas_combined.fa"
shell:
¦ "cat {input} > {output}"
Error Message:
InputFunctionException in combine_fasta_file:
WorkflowError: Missing wildcard values for sample
Wildcards:
在我看来,这表明我在上面的聚合命令中调用通配符的方式有问题,但我无法弄清楚是什么。任何指针将不胜感激。
函数 aggregate_input
需要变量 wildcards.sample
,但您没有在 rule combine_fasta_file
中指定任何通配符。如果适用,您可能想要在该规则中指定通配符或重构函数以使用全局变量。
收集检查点输出的规则(combine_fasta_file
,在你的例子中)需要包含检查点规则输出中存在的通配符(clustering
,在你的例子中)。这是因为 Snakemake 查看您想要的输出,然后尝试通过您的规则向后工作到可用的输入文件。如果收集规则中的通配符不包括检查点规则的通配符,则引用的确切检查点可能不明确。
为了详细说明 Manavalan Gajapathy 对 Snakemake docs checkpoint example 的评论以及 pabster212 所需的多重聚合,我生成了一个最小示例来突出检查点和多重收集。
输入设置:
mkdir -p input/sampleA_run{1,2,3}
mkdir -p input/sampleB_run{4,5}
touch input/sampleA_run1/{apple,banana,cucumber,beet,basil}.txt
touch input/sampleA_run2/{alpha,beta,gamma}.txt
touch input/sampleA_run3/{red,blue,green,brown}.txt
touch input/sampleB_run4/{alice,betty,beth,barbara,christy}.txt
touch input/sampleB_run5/{alex,brad,bart,bennett,clive}.txt
通过检查点和多聚合的 Snakefile:
rule all:
input:
# Final
expand("output/gathered_samples/{sample}.txt", sample = ["sampleA","sampleB"])
# My real-world analysis at this step (Nanopore guppy basecaller)
# Takes as input a directory
# With a inconsistent numbers of files and names for files (.fast5 in real analysis)
# Due to filtering, there are more inputs than outputs (.fastq in real analysis)
# And the output filenames are not predictably named or numbered
# Therefore I use a checkpoint to evaluate which output files have been created
# In this toy example, only files beginning with 'b' are maintained
checkpoint unpredictable_file_generator:
input:
"input/{sample}_{run_id}"
output:
directory("output/unpredictable_files/{sample}_{run_id}"),
shell:
"""
mkdir -p {output}
for some_file in {input}/*
do
outfile=$(basename "$some_file")
if [[ "$outfile" =~ ^b.* ]]
then
touch "{output}/$outfile"
fi
done
"""
# For my real-world analysis, this intermediate step
# Represents generation of bams from fastqs
# No intermediate step is necessarily required following the checkpoint
# Just modify the input-gathering function below (gather_from_checkpoint)
# To expand 'output/unpredictable_files' instead of 'output/intermediate_files'
rule intermediate_step:
input:
"output/unpredictable_files/{sample}_{run_id}/{file_id}.txt"
output:
"output/intermediate_files/{sample}_{run_id}/{file_id}.txt"
shell:
"""
cp {input} {output}
"""
# Function to gather all outputs from checkpoint rule
# The output of the rule referring back to the checkpoint (gather_one, below)
# Must contain the all wildcards in the checkpoint rule
# To gather further (e.g. gather by sample rather than by run_id)
# An additional gather rule is required
def gather_from_checkpoint(wildcards):
checkpoint_output = checkpoints.unpredictable_file_generator.get(**wildcards).output[0]
return expand("output/intermediate_files/{sample}_{run_id}/{file_id}.txt",
sample=wildcards.sample,
run_id=wildcards.run_id,
file_id=glob_wildcards(os.path.join(checkpoint_output, "{file_id}.txt")).file_id)
# Gathers all files by run_id
# But each sample is still divided into runs
# For my real-world analysis, this could represent a 'samtools merge bam'
rule gather_one:
input:
gather_from_checkpoint
output:
"output/gathered_runs/{sample}_{run_id}.txt"
shell:
"""
# Generate a file for each run
# Containing the names of all files output by 'unknown_file_generator'
echo {input} | tr ' ' $'\n' > {output}
"""
# Gathers all previously-run_id-gathered files by sample
# For my real-world analysis, this could represent a second 'samtools merge bam'
# After which point the first merge could potentially be removed if it were marked 'temporary'
############
# Note wildcard restriction was required
# To prevent the {run_id} match from descending into diretories
# And generating 'ChildIOException' errors
rule gather_two:
input:
lambda wildcards: expand("output/gathered_runs/{sample}_{run_id}.txt",
sample = wildcards.sample,
run_id = glob_wildcards("input/" + wildcards.sample + "_{run_id,[^/]+}/").run_id)
output:
"output/gathered_samples/{sample}.txt"
shell:
"""
# This output should contain 6 filenames for each sample
# Each beginning with 'b'
cat {input} > {output}
"""