Azure 数据工厂:从副本中输出复制的文件和文件夹信息 Activity

Azure Data Factory: Output Copied File and Folder Information from Copy Activity

我正在使用 Azure 数据工厂中的自托管集成运行时将数据从本地源(普通文件系统)复制到 Azure Blob 存储目标。传输后,我想通过在 Databricks 集群上附加笔记本 运行ning 来自动处理文件。管道工作正常,但我的问题涉及 Copy Activity.

的输出

有没有办法获取每个 运行 的传输文件和文件夹的信息? 我会将此信息作为参数传递给笔记本。

查看文档,似乎只有聚合信息可用:

https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview

如果您传输大量文件,哪种方式有意义。如果不可能,我想另一种方法是将复制过程留给自己,并根据存储帐户事件创建另一个管道?或者可以将每个 运行 的新文件和文件夹信息存储在一个固定的文本文件中,也将其传输并在笔记本中阅读?

如果您想获取正在从数据工厂读取的文件或目录的信息,可以使用获取元数据 Activity 来完成,请参阅以下 示例。

检测笔记本中新文件的另一种方法是对文件源使用结构化流。这工作得很好,你只需在副本 activity 之后调用笔记本 activity。

为此,您定义了一个流式输入数据帧:

streamingInputDF = (
   spark
     .readStream                     
     .schema(pqtSchema)               
     .parquet(inputPath) 
 )

inputPath 指向 Blob 存储中的输入目录。支持的文件格式有文本、csv、json、orc、parquet,所以这是否适合你取决于你的具体场景。

重要的是,您在目标上使用触发一次选项,因此笔记本不需要 运行 永久性,例如。 g.:

streamingOutputDF \
    .repartition(1) \
    .writeStream \
    .format("parquet") \
    .partitionBy('Id') \
    .option("checkpointLocation", adlpath +  "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
    .option("path", targetPath + "delta") \
    .trigger(once=True) \
    .start()

另一种方法可以使用 Azure 队列存储 (AQS),请参阅以下 documentation

在这种情况下,解决方案实际上非常简单。我刚刚在 Azure 数据工厂中创建了另一个管道,它由 Blob Created 事件以及 folderfilename 作为参数传递到我的笔记本。似乎运行良好,并且需要最少的配置或代码。基本的过滤可以用事件完成,剩下的就看笔记本了。

对于遇到这种情况的任何其他人,详情如下:

https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger