Azure 数据工厂:从副本中输出复制的文件和文件夹信息 Activity
Azure Data Factory: Output Copied File and Folder Information from Copy Activity
我正在使用 Azure 数据工厂中的自托管集成运行时将数据从本地源(普通文件系统)复制到 Azure Blob 存储目标。传输后,我想通过在 Databricks 集群上附加笔记本 运行ning 来自动处理文件。管道工作正常,但我的问题涉及 Copy Activity.
的输出
有没有办法获取每个 运行 的传输文件和文件夹的信息? 我会将此信息作为参数传递给笔记本。
查看文档,似乎只有聚合信息可用:
https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview
如果您传输大量文件,哪种方式有意义。如果不可能,我想另一种方法是将复制过程留给自己,并根据存储帐户事件创建另一个管道?或者可以将每个 运行 的新文件和文件夹信息存储在一个固定的文本文件中,也将其传输并在笔记本中阅读?
如果您想获取正在从数据工厂读取的文件或目录的信息,可以使用获取元数据 Activity 来完成,请参阅以下 示例。
检测笔记本中新文件的另一种方法是对文件源使用结构化流。这工作得很好,你只需在副本 activity 之后调用笔记本 activity。
为此,您定义了一个流式输入数据帧:
streamingInputDF = (
spark
.readStream
.schema(pqtSchema)
.parquet(inputPath)
)
inputPath 指向 Blob 存储中的输入目录。支持的文件格式有文本、csv、json、orc、parquet,所以这是否适合你取决于你的具体场景。
重要的是,您在目标上使用触发一次选项,因此笔记本不需要 运行 永久性,例如。 g.:
streamingOutputDF \
.repartition(1) \
.writeStream \
.format("parquet") \
.partitionBy('Id') \
.option("checkpointLocation", adlpath + "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
.option("path", targetPath + "delta") \
.trigger(once=True) \
.start()
另一种方法可以使用 Azure 队列存储 (AQS),请参阅以下 documentation。
在这种情况下,解决方案实际上非常简单。我刚刚在 Azure 数据工厂中创建了另一个管道,它由 Blob Created 事件以及 folder 和 filename 作为参数传递到我的笔记本。似乎运行良好,并且需要最少的配置或代码。基本的过滤可以用事件完成,剩下的就看笔记本了。
对于遇到这种情况的任何其他人,详情如下:
https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger
我正在使用 Azure 数据工厂中的自托管集成运行时将数据从本地源(普通文件系统)复制到 Azure Blob 存储目标。传输后,我想通过在 Databricks 集群上附加笔记本 运行ning 来自动处理文件。管道工作正常,但我的问题涉及 Copy Activity.
的输出有没有办法获取每个 运行 的传输文件和文件夹的信息? 我会将此信息作为参数传递给笔记本。
查看文档,似乎只有聚合信息可用:
https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview
如果您传输大量文件,哪种方式有意义。如果不可能,我想另一种方法是将复制过程留给自己,并根据存储帐户事件创建另一个管道?或者可以将每个 运行 的新文件和文件夹信息存储在一个固定的文本文件中,也将其传输并在笔记本中阅读?
如果您想获取正在从数据工厂读取的文件或目录的信息,可以使用获取元数据 Activity 来完成,请参阅以下
检测笔记本中新文件的另一种方法是对文件源使用结构化流。这工作得很好,你只需在副本 activity 之后调用笔记本 activity。
为此,您定义了一个流式输入数据帧:
streamingInputDF = (
spark
.readStream
.schema(pqtSchema)
.parquet(inputPath)
)
inputPath 指向 Blob 存储中的输入目录。支持的文件格式有文本、csv、json、orc、parquet,所以这是否适合你取决于你的具体场景。
重要的是,您在目标上使用触发一次选项,因此笔记本不需要 运行 永久性,例如。 g.:
streamingOutputDF \
.repartition(1) \
.writeStream \
.format("parquet") \
.partitionBy('Id') \
.option("checkpointLocation", adlpath + "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
.option("path", targetPath + "delta") \
.trigger(once=True) \
.start()
另一种方法可以使用 Azure 队列存储 (AQS),请参阅以下 documentation。
在这种情况下,解决方案实际上非常简单。我刚刚在 Azure 数据工厂中创建了另一个管道,它由 Blob Created 事件以及 folder 和 filename 作为参数传递到我的笔记本。似乎运行良好,并且需要最少的配置或代码。基本的过滤可以用事件完成,剩下的就看笔记本了。
对于遇到这种情况的任何其他人,详情如下:
https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger