使用 Azure 数据工厂进行多步增量加载和处理
Multi Step Incremental load and processing using Azure Data Factory
我想实现增量 load/processing 并在处理后使用 Azure 数据工厂 将它们存储在不同的地方,例如:
外部数据源(数据结构化)-> ADLS(原始)-> ADLS(已处理) -> SQL DB
因此,我需要根据当前日期从源中提取 raw 数据的样本,将它们存储在 ADLS 容器中,然后处理相同的样本数据,将它们存储在另一个ADLS容器中,最后将处理后的结果追加到一个SQL DB.
ADLS 原始:
2022-03-01.txt
2022-03-02.txt
已处理 ADLS:
2022-03-01-processed.txt
2022-03-02-processed.txt
SQL数据库:
ADLS 处理容器中的所有txt 文件将附加并存储在SQL DB.
中
因此想检查在必须 运行 分批处理的单个管道中实现此目的的最佳方法是什么?
您可以使用动态管道实现此目的,如下所示:
在 SQL 数据库中创建配置/元数据 table,您可以在其中放置源 table 名称、源名称等详细信息
创建管道如下:
a) 添加查找 activity,您可以在其中根据您的配置创建查询 table
https://docs.microsoft.com/en-us/azure/data-factory/control-flow-lookup-activity
b) 添加一个 ForEach activity 并使用 Lookup 输出作为 ForEach 的输入
https://docs.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity
c) 在 ForEach 中你可以添加一个 switch activity,其中每个 Switch case 区分 table 或 source
d) 在每种情况下添加您需要在 RAW 层中创建文件的 COPY 或其他活动
e) 在您的管道中为处理层添加另一个 ForEach,您可以在其中添加与为 RAW 层所做的类似类型的内部活动,并且在此 activity 中您可以添加处理逻辑
通过这种方式,您可以创建单个管道,也可以创建一个动态管道,它可以对所有源执行必要的操作
您不能一次重命名多个文件,因此您必须一个接一个地复制文件。
- 创建一个 pipeline with tumbling window trigger - 在名为 WindowStartTime 和 WindowEndTime[的触发器和管道中创建两个 parameters =30=]
- 创建一个 GetMetaData activity 使用参数 last modified datetime 并传递 WindowStartTime 和 WindowEndTime 以获取放置在 WindowStartTime 和 WindowEndTime 之间的文件列表
- 创建一个ForEach activity传递从Getmetadata
接收到的数据
- 在内部为 activity 创建副本 activity 并从 ForEach 循环传递文件名
- 在接收器数据集中传递文件名并连接“_processed/txt”
- 创建一个 Copy activity 在每个 activity 以源作为处理层再次传递 WindowStartTime 和 WindowEndTime
- 此复制 activity 将读取当天收到的最新文件并将其附加到SQL DB
我想实现增量 load/processing 并在处理后使用 Azure 数据工厂 将它们存储在不同的地方,例如:
外部数据源(数据结构化)-> ADLS(原始)-> ADLS(已处理) -> SQL DB
因此,我需要根据当前日期从源中提取 raw 数据的样本,将它们存储在 ADLS 容器中,然后处理相同的样本数据,将它们存储在另一个ADLS容器中,最后将处理后的结果追加到一个SQL DB.
ADLS 原始:
2022-03-01.txt
2022-03-02.txt
已处理 ADLS:
2022-03-01-processed.txt
2022-03-02-processed.txt
SQL数据库:
ADLS 处理容器中的所有txt 文件将附加并存储在SQL DB.
中因此想检查在必须 运行 分批处理的单个管道中实现此目的的最佳方法是什么?
您可以使用动态管道实现此目的,如下所示:
在 SQL 数据库中创建配置/元数据 table,您可以在其中放置源 table 名称、源名称等详细信息
创建管道如下:
a) 添加查找 activity,您可以在其中根据您的配置创建查询 table https://docs.microsoft.com/en-us/azure/data-factory/control-flow-lookup-activity
b) 添加一个 ForEach activity 并使用 Lookup 输出作为 ForEach 的输入 https://docs.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity
c) 在 ForEach 中你可以添加一个 switch activity,其中每个 Switch case 区分 table 或 source
d) 在每种情况下添加您需要在 RAW 层中创建文件的 COPY 或其他活动
e) 在您的管道中为处理层添加另一个 ForEach,您可以在其中添加与为 RAW 层所做的类似类型的内部活动,并且在此 activity 中您可以添加处理逻辑
通过这种方式,您可以创建单个管道,也可以创建一个动态管道,它可以对所有源执行必要的操作
您不能一次重命名多个文件,因此您必须一个接一个地复制文件。
- 创建一个 pipeline with tumbling window trigger - 在名为 WindowStartTime 和 WindowEndTime[的触发器和管道中创建两个 parameters =30=]
- 创建一个 GetMetaData activity 使用参数 last modified datetime 并传递 WindowStartTime 和 WindowEndTime 以获取放置在 WindowStartTime 和 WindowEndTime 之间的文件列表
- 创建一个ForEach activity传递从Getmetadata 接收到的数据
- 在内部为 activity 创建副本 activity 并从 ForEach 循环传递文件名
- 在接收器数据集中传递文件名并连接“_processed/txt”
- 创建一个 Copy activity 在每个 activity 以源作为处理层再次传递 WindowStartTime 和 WindowEndTime
- 此复制 activity 将读取当天收到的最新文件并将其附加到SQL DB