从 azure blob 插入数据并根据 blob 的名称插入到 Azure 数据工厂中的某些 table
inserting data from azure blob and depending on the name of the blob insert into certain table in Azure Data Factory
基本上我的问题是这样的,我将使用元数据从 Azure 存储帐户的源文件夹中获取文件的名称。我需要解析该名称并将其插入相应的 table。下面的示例。
文件名将采用这种格式。 customer_GUIID_TypeOfData_other information.csv
即 1c56d6s4s33s4_Sales_09112021.csv
156468a5s5s54_Inventory_08022021.csv
所以这是 2 个不同的客户和两种不同类型的信息。
SQL 中的 table 就是没有日期的那个。 156468a5s5s54_Inventory 或 1c56d6s4s33s4_Sales
如何根据文件名将数据从 CSV 复制到相应的 table?我还需要使用 AZURE 数据工厂根据文件数据集中的唯一标识符插入或更新目标 table 中的现有行。
使用 Get Metadata
activity 获取文件名并将数据从 CSV 复制到 Azure SQL table 使用 Dataflow
activity 和 Upsert启用。
输入 blob 文件:
步骤1:
• 创建一个定界符 源数据集。为文件名创建一个参数以动态传递它。
• 创建 Azure SQL 数据库 接收数据集并创建参数以动态传递 table 名称。
源数据集:
接收器数据集:
步骤2:
• 连接源数据集以获取元数据activity 并传递“*.csv”作为文件名以从 blob 文件夹中获取所有文件名的列表。
Get Metada 的输出:
第三步:
• 将 Get Metadata
activity 的输出连接到 ForEach 循环,以将所有传入的 Source files/data 加载到 Sink。
• 向项目添加表达式以获取先前 activity 的子项目。
@activity('Get Metadata1').output.childitems
Step4:
• 在 foreach 循环内添加 dataflow
activity。
• 将源连接到源数据集。
数据流源:
步骤5:
• 将接收器连接到接收器数据集。
• 启用允许更新插入 如果记录基于唯一键列存在则更新。
Step6:
• 在源和接收器之间添加 AlterRow
以添加更新插入的条件。
• 当唯一键列不为空或找到时更新。
Upsert if: isNull(id)==false()
步骤7:
• 在ForEach
循环中,dataflow 设置,动态添加源文件名和接收器table 名称的表达式。
Src_file_name: @item().name
• 当我们从源文件名中提取接收器 table 名称时。根据下划线“_”拆分文件名,然后合并1st 2字符串以消除日期部分。
Sink_tbname: @concat(split(item().name, '_')[0],'_',split(item().name, '_')[1])
步骤8:
当管道为运行时,您可以看到循环执行了blob中的源文件数量,并根据文件名将数据加载到相应的tables。
基本上我的问题是这样的,我将使用元数据从 Azure 存储帐户的源文件夹中获取文件的名称。我需要解析该名称并将其插入相应的 table。下面的示例。
文件名将采用这种格式。 customer_GUIID_TypeOfData_other information.csv 即 1c56d6s4s33s4_Sales_09112021.csv 156468a5s5s54_Inventory_08022021.csv
所以这是 2 个不同的客户和两种不同类型的信息。
SQL 中的 table 就是没有日期的那个。 156468a5s5s54_Inventory 或 1c56d6s4s33s4_Sales
如何根据文件名将数据从 CSV 复制到相应的 table?我还需要使用 AZURE 数据工厂根据文件数据集中的唯一标识符插入或更新目标 table 中的现有行。
使用 Get Metadata
activity 获取文件名并将数据从 CSV 复制到 Azure SQL table 使用 Dataflow
activity 和 Upsert启用。
输入 blob 文件:
步骤1:
• 创建一个定界符 源数据集。为文件名创建一个参数以动态传递它。
• 创建 Azure SQL 数据库 接收数据集并创建参数以动态传递 table 名称。
源数据集:
接收器数据集:
步骤2:
• 连接源数据集以获取元数据activity 并传递“*.csv”作为文件名以从 blob 文件夹中获取所有文件名的列表。
Get Metada 的输出:
第三步:
• 将 Get Metadata
activity 的输出连接到 ForEach 循环,以将所有传入的 Source files/data 加载到 Sink。
• 向项目添加表达式以获取先前 activity 的子项目。
@activity('Get Metadata1').output.childitems
Step4:
• 在 foreach 循环内添加 dataflow
activity。
• 将源连接到源数据集。
数据流源:
步骤5:
• 将接收器连接到接收器数据集。
• 启用允许更新插入 如果记录基于唯一键列存在则更新。
Step6:
• 在源和接收器之间添加 AlterRow
以添加更新插入的条件。
• 当唯一键列不为空或找到时更新。
Upsert if: isNull(id)==false()
步骤7:
• 在ForEach
循环中,dataflow 设置,动态添加源文件名和接收器table 名称的表达式。
Src_file_name: @item().name
• 当我们从源文件名中提取接收器 table 名称时。根据下划线“_”拆分文件名,然后合并1st 2字符串以消除日期部分。
Sink_tbname:
@concat(split(item().name, '_')[0],'_',split(item().name, '_')[1])
步骤8:
当管道为运行时,您可以看到循环执行了blob中的源文件数量,并根据文件名将数据加载到相应的tables。