如何从工作流中某个点创建的文件子集继续 snakemake 工作流? (试过检查点)
How can I continue a snakemake workflow from a subset of files created at some point in the workflow? (tried checkpoint)
在我的工作流程中的某个时刻,我选择了一部分文件。我想使用该文件子集继续我工作流程中的后续步骤:
rule all:
input:
"results/selected_files.tsv",
checkpoint select_by_size:
input:
"results/selected_seqs"
output:
directory("results/selected_seqs_by_size")
shell:
"""
mkdir -p {output[0]}
# The second -size is only for testing, remove it for a real run
find {input} -size +302c -size -304c -exec cp {{}} {output[0]} \;
"""
def get_selected_files(wildcards):
ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
GENES, = glob_wildcards(join(ck_output, "{gene}.fasta"))
return expand(join(ck_output, "{GENE}.fasta"), GENE=GENES)
rule create_table_sel:
input:
get_selected_files
output:
"results/selected_files.tsv"
run:
with open(output[0], 'w') as fh:
for f in input:
print(Path(f).stem, f, file=fh, sep='\t')
规则 create_table_sel
创建一个 table 如下所示:
JLEOKLFN_00589 results/selected_seqs_by_size/JLEOKLFN_00589.fasta
JLEOKLFN_01812 results/selected_seqs_by_size/JLEOKLFN_01812.fasta
如果我尝试使用此 table 将单个文件放入以下规则,它会失败(将 "results/a3ms.txt"
添加到 rule all
)
def get_seqs(wildcards):
df = pd.read_csv("results/selected_files.tsv", sep="\t", header=None, index_col=0)
return df.loc[wildcards.gene].values[0]
rule hhblits_msa:
input:
get_seqs
# "results/selected_seqs_by_size/{gene}.fasta"
output:
"results/hhblits_msa/{gene}.a3m"
log:
"logs/hhblits/{gene}_msa.log"
params:
db=config["msa_db"],
n_iter= 2
threads: 4
shell:
"""
mkdir -p results/hhblits_msa
hhblits -i {input[0]} -d {params.db} -oa3m {output[0]} -cpu {threads} -n {params.n_iter} \
&> {log}
"""
# The intention is to use the above rule as input
rule agg:
input:
"results/hhblits_msa/{gene}.a3m"
output:
"results/a3ms.txt"
shell:
"echo {input} >> {output}"
Wildcards in input files cannot be determined from output files:
'gene'
如果我使用类似 snakemake --cores 4 results/hhblits_msa/JLEOKLFN_00589.a3m
的东西,它运行没有问题,但我当然想使用这个工作流而不需要在命令行中指定输出文件。
如何将规则 hhblits_msa
添加到我的工作流中?我如何才能从同一工作流程中某个时间点创建的文件子集继续任何工作流程?
此外,某种程度上相关的是,是否可以将 checkpoints
对象用于 return 单个文件,而不仅仅是 return 所有带有 expand
的文件?我尝试了一些东西但失败了,因为无法获得通配符。这个想法是使用该单个文件作为任何规则的输入,例如 hhblits_msa
.
已添加
问题是使用在 select_by_size
中创建的文件作为 hhblits_msa
的输入。鉴于这些文件是在相同的工作流程中创建的,而且我事先不知道它们,所以我不能将它们包含在 all
中。当我使用此工作流或 建议的工作流时,规则 hhblits_msa
没有被执行。
我不太确定你的工作流程设计,但我觉得你不需要 create_table_sel
规则。
您可能希望将输入函数移至规则 agg
,同时将输入函数修改为 return agg
的预期输入,示例:
def get_selected_files(wildcards):
ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
GENES, = glob_wildcards(join(ck_output, "{gene}.fasta"))
return expand("results/hhblits_msa/{GENE}.a3m", GENE=GENES)
然后根据此更改,您的 hhblits_msa
规则应如下所示:
rule hhblits_msa:
input:
"results/selected_seqs_by_size/{gene}.fasta"
output:
"results/hhblits_msa/{gene}.a3m"
... # log, parameters and shell command
snakemake的执行过程是这样的:
1.agg
-> 2.input 函数 get_selected_files
-> 3.execute 检查点 -> 4. 执行 hhblits_msa
-> 执行 agg
在我的工作流程中的某个时刻,我选择了一部分文件。我想使用该文件子集继续我工作流程中的后续步骤:
rule all:
input:
"results/selected_files.tsv",
checkpoint select_by_size:
input:
"results/selected_seqs"
output:
directory("results/selected_seqs_by_size")
shell:
"""
mkdir -p {output[0]}
# The second -size is only for testing, remove it for a real run
find {input} -size +302c -size -304c -exec cp {{}} {output[0]} \;
"""
def get_selected_files(wildcards):
ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
GENES, = glob_wildcards(join(ck_output, "{gene}.fasta"))
return expand(join(ck_output, "{GENE}.fasta"), GENE=GENES)
rule create_table_sel:
input:
get_selected_files
output:
"results/selected_files.tsv"
run:
with open(output[0], 'w') as fh:
for f in input:
print(Path(f).stem, f, file=fh, sep='\t')
规则 create_table_sel
创建一个 table 如下所示:
JLEOKLFN_00589 results/selected_seqs_by_size/JLEOKLFN_00589.fasta
JLEOKLFN_01812 results/selected_seqs_by_size/JLEOKLFN_01812.fasta
如果我尝试使用此 table 将单个文件放入以下规则,它会失败(将 "results/a3ms.txt"
添加到 rule all
)
def get_seqs(wildcards):
df = pd.read_csv("results/selected_files.tsv", sep="\t", header=None, index_col=0)
return df.loc[wildcards.gene].values[0]
rule hhblits_msa:
input:
get_seqs
# "results/selected_seqs_by_size/{gene}.fasta"
output:
"results/hhblits_msa/{gene}.a3m"
log:
"logs/hhblits/{gene}_msa.log"
params:
db=config["msa_db"],
n_iter= 2
threads: 4
shell:
"""
mkdir -p results/hhblits_msa
hhblits -i {input[0]} -d {params.db} -oa3m {output[0]} -cpu {threads} -n {params.n_iter} \
&> {log}
"""
# The intention is to use the above rule as input
rule agg:
input:
"results/hhblits_msa/{gene}.a3m"
output:
"results/a3ms.txt"
shell:
"echo {input} >> {output}"
Wildcards in input files cannot be determined from output files:
'gene'
如果我使用类似 snakemake --cores 4 results/hhblits_msa/JLEOKLFN_00589.a3m
的东西,它运行没有问题,但我当然想使用这个工作流而不需要在命令行中指定输出文件。
如何将规则 hhblits_msa
添加到我的工作流中?我如何才能从同一工作流程中某个时间点创建的文件子集继续任何工作流程?
此外,某种程度上相关的是,是否可以将 checkpoints
对象用于 return 单个文件,而不仅仅是 return 所有带有 expand
的文件?我尝试了一些东西但失败了,因为无法获得通配符。这个想法是使用该单个文件作为任何规则的输入,例如 hhblits_msa
.
已添加
问题是使用在 select_by_size
中创建的文件作为 hhblits_msa
的输入。鉴于这些文件是在相同的工作流程中创建的,而且我事先不知道它们,所以我不能将它们包含在 all
中。当我使用此工作流或 hhblits_msa
没有被执行。
我不太确定你的工作流程设计,但我觉得你不需要 create_table_sel
规则。
您可能希望将输入函数移至规则 agg
,同时将输入函数修改为 return agg
的预期输入,示例:
def get_selected_files(wildcards):
ck_output = checkpoints.select_by_size.get(**wildcards).output[0]
GENES, = glob_wildcards(join(ck_output, "{gene}.fasta"))
return expand("results/hhblits_msa/{GENE}.a3m", GENE=GENES)
然后根据此更改,您的 hhblits_msa
规则应如下所示:
rule hhblits_msa:
input:
"results/selected_seqs_by_size/{gene}.fasta"
output:
"results/hhblits_msa/{gene}.a3m"
... # log, parameters and shell command
snakemake的执行过程是这样的:
1.agg
-> 2.input 函数 get_selected_files
-> 3.execute 检查点 -> 4. 执行 hhblits_msa
-> 执行 agg