当并非所有作业都成功输出先前规则的文件时,如何编写 snakemake 输入?
How do I write a snakemake input when not all jobs successfully output files from previous rule?
基本上,我有三个 snakemake 规则(除了 rule all)而且无法解决这个问题,尽管有检查点资源。
第一个规则是我的第一个也是唯一一个开始的文件。它将有 x 个输出(数量因输入文件而异)。这些 x 输出中的每一个都需要在规则 2 中单独处理,这意味着规则 2 将 运行 x 个作业。但是,这些作业中只有部分子集 y 会产生输出(软件只会为通过特定阈值的输入写出文件)。所以,虽然
我希望每个输出 运行 作为作业 3 中的一个单独作业,我不知道规则 2 会产生多少文件。规则三也将 运行 y 个作业,每个一个规则 2 的成功输出。我有两个问题。第一个是我如何编写规则 3 的输入,不知道规则 2 会产生多少文件?第二个问题是,当输入文件没有相应数量的输出文件时,我如何“告诉”规则 2 已经完成?如果我添加第四条规则,我想它会尝试在没有获得输出文件的作业上重新 运行 规则二,这永远不会产生输出。也许我在设置检查点时遗漏了什么?
类似于:
rule a:
input: file.vcf
output: dummy.txt
shell:"""
.... make unknown number of output files (x) x_1 , x_2, ..., x_n
"""
#run a separate job from each output of rule a
rule b:
input: x_1 #not sure how many are going to be inputs here
output: y_1 #not sure how many output files will be here
shell:"""
some of the x inputs will output their corresponding y, but others will have no output
"""
#run a separate job for each output of rule b
rule c:
input: y_1 #not sure how many input files here
output: z_1
您应该将 rule a
更改为评论中提到的检查点。 Rule b
将为每个输入生成一个输出,可以保留原样,与本例中的 rule c
相同。
最终,您将拥有一个类似聚合的规则来决定需要哪些输出。它可能是规则 d,也可能最终成为规则 all。无论哪种方式,聚合规则都需要一个输入函数来调用检查点以确定存在哪些文件。如果你跟着the example,你会得到类似的东西:
checkpoint a:
input: file.vcf
output: directory('output_dir')
shell:"""
mkdir {output} # then put all the output files here!
.... make unknown number of output files (x) x_1 , x_2, ..., x_n
"""
#run a separate job from each output of rule a
rule b:
input: output_dir/x_{n}
output: y_{n}
shell:"""
some of the x inputs will output their corresponding y, but others will have no output
"""
#run a separate job for each output of rule b
rule c:
input: y_{n}
output: z_{n}
# input function for the rule aggregate
def aggregate_input(wildcards):
checkpoint_output = checkpoints.a.get(**wildcards).output[0]
return expand("z_{i}",
i=glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i)
rule aggregate: # what do you do with all the z files? could be all
input: aggregate_input
如果您将工作流想象成一棵树,则规则 a 是 b运行ching 和可变数量的 b运行ches。规则 b 和 c 是一对一的映射。聚合将所有 b运行ches 重新组合在一起,并负责检查存在多少 b运行ches。规则 b 和 c 只看到一个 input/output 并且不关心有多少其他 b运行ches。
编辑以回答评论中的问题并修复了我的代码中的几个错误:
I still get confused here though, because rule b will not have as many outputs as inputs, so won't rule aggregate never run until all of the wildcards from the output of checkpoint a are present in z_{n}, which they never would be?
这令人困惑,因为这不是 snakemake 通常的工作方式,并且会导致很多关于 SO 的问题。您需要记住的是,当 checkpoints.<rule>.get
为 运行 时,该步骤的计算有效暂停。考虑 i == [1, 2, 3]
的三个值的简单情况,但在 checkpoint a
中只创建了 i == 2 and 3
。我们知道 DAG 看起来像这样:
rule file
input file.vcf
/ | \
a x_2 x_3
| |
b y_2 y_3
| |
c z_2 z_3
\ /
aggregate OUTPUT
checkpoint a
中缺少 x_1
。但是,snakemake 不知道 checkpoint a
的行为,只是它会生成一个目录作为输出,并且(因为它是一个检查点)一旦完成,DAG 将被重新评估。所以如果你 运行 snakemake -nq
你会看到 checkpoint a
而 aggregate
会 运行,但不会提到 b
或 c
。那时,这些是 snakemake 知道并计划 运行 的唯一规则。调用 checkpoint.<rule>.get
基本上是说“在这里等一下,在这条规则之后你将不得不看看做了什么”。
所以当 snakemake 第一次启动 运行ning 你的工作流程时,DAG 看起来像这样:
rule file
input file.vcf
|
a ...
|
???? ...
|
aggregate OUTPUT
Snakemake 不知道规则 a
和 aggregate
之间发生了什么,只是它需要 运行 a
才能知道。
rule file
input file.vcf
/ | \
a x_2 x_3
???? ...
|
aggregate OUTPUT
检查点 a
已安排,运行,现在重新评估 DAG。 aggregate_input
的其余部分查看 与 glob_wildcards
一起出现的文件,然后使用该信息来决定它需要哪些文件。请注意,扩展正在请求来自 rule c
的输出,这需要 rule b
,这需要 x_{n}
,现在检查点有 运行。现在snakemake可以构造出你想要的DAG了
下面是输入函数,其中包含更多注释,希望能说明清楚:
def aggregate_input(wildcards):
# say this rule depends on a checkpoint. DAG evaulation pauses here
checkpoint_output = checkpoints.a.get(**wildcards).output[0]
# at this point, checkpoint a has completed and the output (directory)
# is in checkpoint_output. Some number of files are there
# use glob_wildcards to find the x_{i} files that actually exist
found_files = glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i
# now we know we need all the z files to be created *if* a x file exists.
return expand("z_{i}", i=found_files)
基本上,我有三个 snakemake 规则(除了 rule all)而且无法解决这个问题,尽管有检查点资源。
第一个规则是我的第一个也是唯一一个开始的文件。它将有 x 个输出(数量因输入文件而异)。这些 x 输出中的每一个都需要在规则 2 中单独处理,这意味着规则 2 将 运行 x 个作业。但是,这些作业中只有部分子集 y 会产生输出(软件只会为通过特定阈值的输入写出文件)。所以,虽然 我希望每个输出 运行 作为作业 3 中的一个单独作业,我不知道规则 2 会产生多少文件。规则三也将 运行 y 个作业,每个一个规则 2 的成功输出。我有两个问题。第一个是我如何编写规则 3 的输入,不知道规则 2 会产生多少文件?第二个问题是,当输入文件没有相应数量的输出文件时,我如何“告诉”规则 2 已经完成?如果我添加第四条规则,我想它会尝试在没有获得输出文件的作业上重新 运行 规则二,这永远不会产生输出。也许我在设置检查点时遗漏了什么?
类似于:
rule a:
input: file.vcf
output: dummy.txt
shell:"""
.... make unknown number of output files (x) x_1 , x_2, ..., x_n
"""
#run a separate job from each output of rule a
rule b:
input: x_1 #not sure how many are going to be inputs here
output: y_1 #not sure how many output files will be here
shell:"""
some of the x inputs will output their corresponding y, but others will have no output
"""
#run a separate job for each output of rule b
rule c:
input: y_1 #not sure how many input files here
output: z_1
您应该将 rule a
更改为评论中提到的检查点。 Rule b
将为每个输入生成一个输出,可以保留原样,与本例中的 rule c
相同。
最终,您将拥有一个类似聚合的规则来决定需要哪些输出。它可能是规则 d,也可能最终成为规则 all。无论哪种方式,聚合规则都需要一个输入函数来调用检查点以确定存在哪些文件。如果你跟着the example,你会得到类似的东西:
checkpoint a:
input: file.vcf
output: directory('output_dir')
shell:"""
mkdir {output} # then put all the output files here!
.... make unknown number of output files (x) x_1 , x_2, ..., x_n
"""
#run a separate job from each output of rule a
rule b:
input: output_dir/x_{n}
output: y_{n}
shell:"""
some of the x inputs will output their corresponding y, but others will have no output
"""
#run a separate job for each output of rule b
rule c:
input: y_{n}
output: z_{n}
# input function for the rule aggregate
def aggregate_input(wildcards):
checkpoint_output = checkpoints.a.get(**wildcards).output[0]
return expand("z_{i}",
i=glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i)
rule aggregate: # what do you do with all the z files? could be all
input: aggregate_input
如果您将工作流想象成一棵树,则规则 a 是 b运行ching 和可变数量的 b运行ches。规则 b 和 c 是一对一的映射。聚合将所有 b运行ches 重新组合在一起,并负责检查存在多少 b运行ches。规则 b 和 c 只看到一个 input/output 并且不关心有多少其他 b运行ches。
编辑以回答评论中的问题并修复了我的代码中的几个错误:
I still get confused here though, because rule b will not have as many outputs as inputs, so won't rule aggregate never run until all of the wildcards from the output of checkpoint a are present in z_{n}, which they never would be?
这令人困惑,因为这不是 snakemake 通常的工作方式,并且会导致很多关于 SO 的问题。您需要记住的是,当 checkpoints.<rule>.get
为 运行 时,该步骤的计算有效暂停。考虑 i == [1, 2, 3]
的三个值的简单情况,但在 checkpoint a
中只创建了 i == 2 and 3
。我们知道 DAG 看起来像这样:
rule file
input file.vcf
/ | \
a x_2 x_3
| |
b y_2 y_3
| |
c z_2 z_3
\ /
aggregate OUTPUT
checkpoint a
中缺少 x_1
。但是,snakemake 不知道 checkpoint a
的行为,只是它会生成一个目录作为输出,并且(因为它是一个检查点)一旦完成,DAG 将被重新评估。所以如果你 运行 snakemake -nq
你会看到 checkpoint a
而 aggregate
会 运行,但不会提到 b
或 c
。那时,这些是 snakemake 知道并计划 运行 的唯一规则。调用 checkpoint.<rule>.get
基本上是说“在这里等一下,在这条规则之后你将不得不看看做了什么”。
所以当 snakemake 第一次启动 运行ning 你的工作流程时,DAG 看起来像这样:
rule file
input file.vcf
|
a ...
|
???? ...
|
aggregate OUTPUT
Snakemake 不知道规则 a
和 aggregate
之间发生了什么,只是它需要 运行 a
才能知道。
rule file
input file.vcf
/ | \
a x_2 x_3
???? ...
|
aggregate OUTPUT
检查点 a
已安排,运行,现在重新评估 DAG。 aggregate_input
的其余部分查看 与 glob_wildcards
一起出现的文件,然后使用该信息来决定它需要哪些文件。请注意,扩展正在请求来自 rule c
的输出,这需要 rule b
,这需要 x_{n}
,现在检查点有 运行。现在snakemake可以构造出你想要的DAG了
下面是输入函数,其中包含更多注释,希望能说明清楚:
def aggregate_input(wildcards):
# say this rule depends on a checkpoint. DAG evaulation pauses here
checkpoint_output = checkpoints.a.get(**wildcards).output[0]
# at this point, checkpoint a has completed and the output (directory)
# is in checkpoint_output. Some number of files are there
# use glob_wildcards to find the x_{i} files that actually exist
found_files = glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i
# now we know we need all the z files to be created *if* a x file exists.
return expand("z_{i}", i=found_files)