Azure Synapse、管道和 Azure Datalake

Azure Synapse, Pipeline and Azure Datalake

我有一个 ETL 管道,当文件上传到 Datalake 位置“Processing”文件夹时会触发,处理完数据后我想将文件移动到 Datalake 中的“Processed”文件夹中的不同位置,该怎么做那个?

如果您想将数据从数据湖中的处理文件夹移动到已处理文件夹,您可以使用存储事件触发器向您之前的副本添加额外的活动 activity。下面提供了类似方法的图像。

从上图中,'from_dl_to_db' 是一个副本 activity,用于将文件从 datalake 处理文件夹移动到 SQL 服务器 table。使用'to_processed'复制activity,我们可以将文件从处理文件夹移动到处理文件夹。最后,'delete_processing' 从处理文件夹中删除复制到 SQL 服务器 table 的文件,因为它在处理文件夹中可用。

如果您在上传前知道文件名,那么您可以在上传和触发管道之前在管道中手动更改名称。但是这个过程需要很多时间并且效率不高。由于这个原因,您必须使用 Get Metadata 来检索文件名。下面的文档演示了如何使用 GetMetadata activity 的示例,您可以按照该示例进行操作。

https://www.c-sharpcorner.com/article/extract-file-names-and-copy-from-source-path-in-azure-data-factory/

从这个文档中,您可以了解到我们必须使用 'child items' 因为我们正在尝试使用 datalake 中的处理文件夹,但我们不知道将上传到其中的文件的名称。创建一个 ForEach activity 以循环遍历处理文件夹的内容(遵循 link 中提供的相同过程)。

在 ForEach activity 中,导航到 Activities 选项卡为后续步骤创建活动。创建副本 activity 以将文件从数据湖移动到 SQL 服务器 table。对于源数据集,您可以按照 link 中提供的相同步骤进行操作。对于 Sink 数据集,为您的数据库创建一个 linked 服务。打开数据集并在 Parameters 选项卡下创建一个参数。移至 Connection 选项卡并在 table 名称字段中指定其值,如下所示。

并在您的管道接收器设置中检查 'Auto create table' 选项,然后为参数 'table_name' 提供值,如下所示。

创建另一个副本 activity 以将文件从处理中移动到数据湖中的已处理文件夹。在源(处理文件夹)下,像为“from_dl_to_db”activity 的源所做的那样设置参数值,并指定指向“已处理文件夹”的接收器。

创建最后的删除activity(删除是必须的,因为我们正在使用foreach,如果不删除它会再次复制到数据库中)。按照与复制活动的前两个源代码部分中相同的步骤使用动态参数。

由于您已经有了存储事件触发器,请使用它和 运行 管道。上传文件后,它会作为 table 复制到数据库,然后复制到已处理文件夹,表示已处理并从处理文件夹中删除。使用此方法的唯一缺点是将多个文件上传到处理文件夹(源)会触发每个文件的管道,管道可能会失败。因此,请尝试一次只上传一个文件,管道可以正常工作。

我的输出图像links:

上传文件到处理文件夹:https://i.stack.imgur.com/RqeAY.png

管道执行:https://i.stack.imgur.com/RnJWu.png

文件作为 table 复制到数据库:https://i.stack.imgur.com/HLaID.png

文件移动到已处理的文件夹:https://i.stack.imgur.com/Pn82O.png

从处理文件夹中删除的文件:https://i.stack.imgur.com/wHHgb.png