在不使用已弃用动态 API 的情况下构建具有动态输入的工作流
Building a workflow with dynamic input without using deprecated dynamic API
我想创建一个工作流程,从远程服务器下载一些 FASTQ 文件的列表,检查 md5 并运行一些 post 处理,例如对齐。
我了解如何使用两个工作流程来实现:
首先下载fastq文件列表文件,例如md5
文件.
读取 md5
文件内容并在 all
规则中为所需的结果文件创建相应的目标。
我想在一个工作流程中完成这项工作。下面不正确的工作流程显示了我想要实现的想法。
在 all
规则 input:
部分我不知道 {sample}
在 md5
文件下载和解析之前的值
我试过动态、检查点和子分支流,但未能达到预期的结果。至于 dynamic
我只设法为动态("fastq/{sample}.fq.gz.md5")输出实现了这个工作流。
此外,我对不使用 dynamic
的解决方案感兴趣,因为它已被弃用。
rule all:
input:
"md5",
"bams/{sample}.bam",
rule download_files_list:
output: "md5"
#shell: "wget {}".format(config["url_files_list"])
run:
# For testing instead of downloading:
content = """
bfe583337fd68b3 ID_001_1.fq.gz
1636b6756daa65f ID_001_2.fq.gz
0428baf25307249 ID_002_1.fq.gz
de33d81ba5bfa62 ID_002_2.fq.gz
""".strip()
with open(output[0], mode="w") as f:
print(content, file=f)
rule fastq_md5_files:
input: "md5"
output: "fastq/{sample}.fq.gz.md5"
shell: "mkdir -p fastq && awk '{{ print [=11=] > (\"fastq/\" \".md5\") }}' {input}"
rule download_fastq_and_check_md5:
input: "fastq/{sample}.fq.gz.md5"
output: "fastq/{sample}.fq.gz"
#shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
shell: "touch {output}"
rule align_fastq:
input: "fastq/{sample}.fq.gz"
output: "bams/{sample}.bam"
shell: "touch {output}" # aligning task
您可以下载 fastq 文件列表,并在 snakemake 规则生效之前使用纯 python 代码从那里提取样本列表:
def download_files_list(output):
"""Download the list of fastq files and return the list
of samples
"""
content = """
bfe583337fd68b3 ID_001_1.fq.gz
1636b6756daa65f ID_001_2.fq.gz
0428baf25307249 ID_002_1.fq.gz
de33d81ba5bfa62 ID_002_2.fq.gz
""".strip()
with open(output, mode="w") as f:
print(content, file=f)
return ['ID_001_1', 'ID_001_2', 'ID_002_1', 'ID_002_2']
samples= download_files_list("md5")
wildcard_constraints:
sample= '|'.join([re.escape(x) for x in samples]),
rule all:
input:
expand("bams/{sample}.bam", sample= samples),
rule fastq_md5_files:
input: "md5"
output: "fastq/{sample}.fq.gz.md5"
shell: """awk '{{ print [=10=] > ("fastq/" ".md5") }}' {input}"""
rule download_fastq_and_check_md5:
input: "fastq/{sample}.fq.gz.md5"
output: "fastq/{sample}.fq.gz"
#shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
shell: "touch {output}"
rule align_fastq:
input: "fastq/{sample}.fq.gz"
output: "bams/{sample}.bam"
shell: "touch {output}" # aligning task
(我很好奇自己使用检查点或类似方法的更像 snakemake 的解决方案)
我看到很多关于如何使用 the new checkpoint
feature 的困惑。这是一个简化的说明性示例:
shell.prefix('set -vexu pipefail; ')
rule finish:
input:
"D/all.txt"
checkpoint A:
output:
mydir = directory("A")
shell: """
mkdir -p A
N=$(( $RANDOM % 7 + 1))
echo "N=$N"
# Create a number of files. (
for i in $(seq 1 $N); do
echo $i > "A/$i.txt"
done
"""
rule B:
output:
txt = "B/{i}.txt",
input:
txt = "A/{i}.txt",
shell: """
mkdir -p B
cp -f {input.txt} {output.txt}
"""
rule C:
output:
txt = "C/{i}.txt",
input:
txt = "B/{i}.txt",
shell: """
mkdir -p C
cp -f {input.txt} {output.txt}
"""
def gatherForD_fromC_basedOnA(wildcards):
checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
# That will raise if not ready.
ivals = glob_wildcards(os.path.join(checkpoint_output,
"{i}.txt")).i
print("ivals={}".format(ivals))
return expand("C/{i}.txt", i=ivals)
rule D:
output:
combined = "D/all.txt",
input:
gathered = gatherForD_fromC_basedOnA,
shell: """
mkdir -p D
cat {input.gathered} > {output.combined}
"""
复制到 snakefile
并用
运行
snakemake --verbose -p
- Checkpoint/Rule
A
输出随机数量的文件。 (当然,您可以将它们基于 "input" 部分。)
- 规则
B
和C
是使用标准snakemake"wildcards". 的并行规则
- 规则
D
基于 input-generating 函数接受未知数量的输入。
- 函数
gatherForD_fromC_basedOnA
等待checkpoint-ruleA
的输出,但它命名规则C
的输出,最终被规则 D
消耗掉。结果,snakemake 知道 D
将消耗什么(在 A
完成后)。所以它知道 C
必须产生什么。所以它知道 B
必须产生什么。
- 最后,规则
finish
等待特定的已知文件。
我想创建一个工作流程,从远程服务器下载一些 FASTQ 文件的列表,检查 md5 并运行一些 post 处理,例如对齐。
我了解如何使用两个工作流程来实现:
首先下载fastq文件列表文件,例如
md5
文件.读取
md5
文件内容并在all
规则中为所需的结果文件创建相应的目标。
我想在一个工作流程中完成这项工作。下面不正确的工作流程显示了我想要实现的想法。
在
all
规则input:
部分我不知道{sample}
在md5
文件下载和解析之前的值我试过动态、检查点和子分支流,但未能达到预期的结果。至于
dynamic
我只设法为动态("fastq/{sample}.fq.gz.md5")输出实现了这个工作流。此外,我对不使用
dynamic
的解决方案感兴趣,因为它已被弃用。
rule all:
input:
"md5",
"bams/{sample}.bam",
rule download_files_list:
output: "md5"
#shell: "wget {}".format(config["url_files_list"])
run:
# For testing instead of downloading:
content = """
bfe583337fd68b3 ID_001_1.fq.gz
1636b6756daa65f ID_001_2.fq.gz
0428baf25307249 ID_002_1.fq.gz
de33d81ba5bfa62 ID_002_2.fq.gz
""".strip()
with open(output[0], mode="w") as f:
print(content, file=f)
rule fastq_md5_files:
input: "md5"
output: "fastq/{sample}.fq.gz.md5"
shell: "mkdir -p fastq && awk '{{ print [=11=] > (\"fastq/\" \".md5\") }}' {input}"
rule download_fastq_and_check_md5:
input: "fastq/{sample}.fq.gz.md5"
output: "fastq/{sample}.fq.gz"
#shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
shell: "touch {output}"
rule align_fastq:
input: "fastq/{sample}.fq.gz"
output: "bams/{sample}.bam"
shell: "touch {output}" # aligning task
您可以下载 fastq 文件列表,并在 snakemake 规则生效之前使用纯 python 代码从那里提取样本列表:
def download_files_list(output):
"""Download the list of fastq files and return the list
of samples
"""
content = """
bfe583337fd68b3 ID_001_1.fq.gz
1636b6756daa65f ID_001_2.fq.gz
0428baf25307249 ID_002_1.fq.gz
de33d81ba5bfa62 ID_002_2.fq.gz
""".strip()
with open(output, mode="w") as f:
print(content, file=f)
return ['ID_001_1', 'ID_001_2', 'ID_002_1', 'ID_002_2']
samples= download_files_list("md5")
wildcard_constraints:
sample= '|'.join([re.escape(x) for x in samples]),
rule all:
input:
expand("bams/{sample}.bam", sample= samples),
rule fastq_md5_files:
input: "md5"
output: "fastq/{sample}.fq.gz.md5"
shell: """awk '{{ print [=10=] > ("fastq/" ".md5") }}' {input}"""
rule download_fastq_and_check_md5:
input: "fastq/{sample}.fq.gz.md5"
output: "fastq/{sample}.fq.gz"
#shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
shell: "touch {output}"
rule align_fastq:
input: "fastq/{sample}.fq.gz"
output: "bams/{sample}.bam"
shell: "touch {output}" # aligning task
(我很好奇自己使用检查点或类似方法的更像 snakemake 的解决方案)
我看到很多关于如何使用 the new checkpoint
feature 的困惑。这是一个简化的说明性示例:
shell.prefix('set -vexu pipefail; ')
rule finish:
input:
"D/all.txt"
checkpoint A:
output:
mydir = directory("A")
shell: """
mkdir -p A
N=$(( $RANDOM % 7 + 1))
echo "N=$N"
# Create a number of files. (
for i in $(seq 1 $N); do
echo $i > "A/$i.txt"
done
"""
rule B:
output:
txt = "B/{i}.txt",
input:
txt = "A/{i}.txt",
shell: """
mkdir -p B
cp -f {input.txt} {output.txt}
"""
rule C:
output:
txt = "C/{i}.txt",
input:
txt = "B/{i}.txt",
shell: """
mkdir -p C
cp -f {input.txt} {output.txt}
"""
def gatherForD_fromC_basedOnA(wildcards):
checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
# That will raise if not ready.
ivals = glob_wildcards(os.path.join(checkpoint_output,
"{i}.txt")).i
print("ivals={}".format(ivals))
return expand("C/{i}.txt", i=ivals)
rule D:
output:
combined = "D/all.txt",
input:
gathered = gatherForD_fromC_basedOnA,
shell: """
mkdir -p D
cat {input.gathered} > {output.combined}
"""
复制到 snakefile
并用
snakemake --verbose -p
- Checkpoint/Rule
A
输出随机数量的文件。 (当然,您可以将它们基于 "input" 部分。) - 规则
B
和C
是使用标准snakemake"wildcards". 的并行规则
- 规则
D
基于 input-generating 函数接受未知数量的输入。 - 函数
gatherForD_fromC_basedOnA
等待checkpoint-ruleA
的输出,但它命名规则C
的输出,最终被规则D
消耗掉。结果,snakemake 知道D
将消耗什么(在A
完成后)。所以它知道C
必须产生什么。所以它知道B
必须产生什么。 - 最后,规则
finish
等待特定的已知文件。