使用 Azure 数据工厂进行多步增量加载和处理

Multi Step Incremental load and processing using Azure Data Factory

我想实现增量 load/processing 并在处理后使用 Azure 数据工厂 将它们存储在不同的地方,例如:

外部数据源(数据结构化)-> ADLS(原始)-> ADLS(已处理) -> SQL DB

因此,我需要根据当前日期从源中提取 raw 数据的样本,将它们存储在 ADLS 容器中,然后处理相同的样本数据,将它们存储在另一个ADLS容器中,最后将处理后的结果追加到一个SQL DB.

ADLS 原始:

2022-03-01.txt

2022-03-02.txt

已处理 ADLS:

2022-03-01-processed.txt

2022-03-02-processed.txt

SQL数据库:

ADLS 处理容器中的所有txt 文件将附加并存储在SQL DB.

因此想检查在必须 运行 分批处理的单个管道中实现此目的的最佳方法是什么?

您可以使用动态管道实现此目的,如下所示:

  1. 在 SQL 数据库中创建配置/元数据 table,您可以在其中放置源 table 名称、源名称等详细信息

  2. 创建管道如下:

    a) 添加查找 activity,您可以在其中根据您的配置创建查询 table https://docs.microsoft.com/en-us/azure/data-factory/control-flow-lookup-activity

    b) 添加一个 ForEach activity 并使用 Lookup 输出作为 ForEach 的输入 https://docs.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity

    c) 在 ForEach 中你可以添加一个 switch activity,其中每个 Switch case 区分 table 或 source

    d) 在每种情况下添加您需要在 RAW 层中创建文件的 COPY 或其他活动

    e) 在您的管道中为处理层添加另一个 ForEach,您可以在其中添加与为 RAW 层所做的类似类型的内部活动,并且在此 activity 中您可以添加处理逻辑

通过这种方式,您可以创建单个管道,也可以创建一个动态管道,它可以对所有源执行必要的操作

您不能一次重命名多个文件,因此您必须一个接一个地复制文件。

  • 创建一个 pipeline with tumbling window trigger - 在名为 WindowStartTime 和 WindowEndTime[的触发器和管道中创建两个 parameters =30=]
  • 创建一个 GetMetaData activity 使用参数 last modified datetime 并传递 WindowStartTime 和 WindowEndTime 以获取放置在 WindowStartTime 和 WindowEndTime 之间的文件列表
  • 创建一个ForEach activity传递从Getmetadata
  • 接收到的数据
  • 在内部为 activity 创建副本 activity 并从 ForEach 循环传递文件名
  • 在接收器数据集中传递文件名并连接“_processed/txt”
  • 创建一个 Copy activity 在每个 activity 以源作为处理层再次传递 WindowStartTime 和 WindowEndTime
  • 复制 activity 将读取当天收到的最新文件并将其附加到SQL DB