在不使用已弃用动态 API 的情况下构建具有动态输入的工作流

Building a workflow with dynamic input without using deprecated dynamic API

我想创建一个工作流程,从远程服务器下载一些 FASTQ 文件的列表,检查 md5 并运行一些 post 处理,例如对齐。

我了解如何使用两个工作流程来实现:

  1. 首先下载fastq文件列表文件,例如md5 文件.

  2. 读取 md5 文件内容并在 all 规则中为所需的结果文件创建相应的目标。

我想在一个工作流程中完成这项工作。下面不正确的工作流程显示了我想要实现的想法。

rule all:
    input:
         "md5",
         "bams/{sample}.bam",

rule download_files_list:
    output: "md5"
    #shell: "wget {}".format(config["url_files_list"])
    run:
        # For testing instead of downloading:
        content = """
        bfe583337fd68b3  ID_001_1.fq.gz
        1636b6756daa65f  ID_001_2.fq.gz
        0428baf25307249  ID_002_1.fq.gz
        de33d81ba5bfa62  ID_002_2.fq.gz
        """.strip()
        with open(output[0], mode="w") as f:
            print(content, file=f)

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: "mkdir -p fastq && awk '{{ print [=11=] > (\"fastq/\"  \".md5\") }}' {input}"

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task

您可以下载 fastq 文件列表,并在 snakemake 规则生效之前使用纯 python 代码从那里提取样本列表:

def download_files_list(output):
    """Download the list of fastq files and return the list
    of samples
    """
    content = """
    bfe583337fd68b3  ID_001_1.fq.gz
    1636b6756daa65f  ID_001_2.fq.gz
    0428baf25307249  ID_002_1.fq.gz
    de33d81ba5bfa62  ID_002_2.fq.gz
    """.strip()
    with open(output, mode="w") as f:
        print(content, file=f)
    return ['ID_001_1', 'ID_001_2', 'ID_002_1', 'ID_002_2']    

samples= download_files_list("md5")
wildcard_constraints:
    sample= '|'.join([re.escape(x) for x in samples]),

rule all:
    input:
         expand("bams/{sample}.bam", sample= samples),

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: """awk '{{ print [=10=] > ("fastq/"  ".md5") }}' {input}"""

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task

(我很好奇自己使用检查点或类似方法的更像 snakemake 的解决方案)

我看到很多关于如何使用 the new checkpoint feature 的困惑。这是一个简化的说明性示例:

shell.prefix('set -vexu pipefail; ')

rule finish:
        input:
                "D/all.txt"

checkpoint A:
        output:
                mydir = directory("A")
        shell: """
                mkdir -p A
                N=$(( $RANDOM % 7 + 1))
                echo "N=$N"
                # Create a number of files. (
                for i in $(seq 1 $N); do
                        echo $i > "A/$i.txt"
                done
        """

rule B:
        output:
                txt = "B/{i}.txt",
        input:
                txt = "A/{i}.txt",
        shell: """
                mkdir -p B
                cp -f {input.txt} {output.txt}
        """

rule C:
        output:
                txt = "C/{i}.txt",
        input:
                txt = "B/{i}.txt",
        shell: """
                mkdir -p C
                cp -f {input.txt} {output.txt}
        """

def gatherForD_fromC_basedOnA(wildcards):
        checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
        # That will raise if not ready.

        ivals = glob_wildcards(os.path.join(checkpoint_output,
                        "{i}.txt")).i
        print("ivals={}".format(ivals))
        return expand("C/{i}.txt", i=ivals)

rule D:
        output:
                combined = "D/all.txt",
        input:
                gathered = gatherForD_fromC_basedOnA,
        shell: """
                mkdir -p D
                cat {input.gathered} > {output.combined}
        """

复制到 snakefile 并用

运行
snakemake --verbose -p
  • Checkpoint/Rule A 输出随机数量的文件。 (当然,您可以将它们基于 "input" 部分。)
  • 规则BC是使用标准snakemake"wildcards".
  • 的并行规则
  • 规则 D 基于 input-generating 函数接受未知数量的输入。
  • 函数gatherForD_fromC_basedOnA等待checkpoint-ruleA的输出,但它命名规则C的输出,最终被规则 D 消耗掉。结果,snakemake 知道 D 将消耗什么(在 A 完成后)。所以它知道 C 必须产生什么。所以它知道 B 必须产生什么。
  • 最后,规则 finish 等待特定的已知文件。