根据确认文件从 gen2 ADLS 复制文件

Copy files from gen2 ADLS based on acknowledgement file

我正在尝试使用数据工厂管道将数据从 gen2 ADLS 复制到另一个 ADLS。 此管道每天 运行s 并且仅复制该特定日期的数据。这是通过在副本 activity.

中提供开始和结束时间来完成的

有时源 ADLS 中的文件会延迟,因此管道会 运行,但不会复制任何数据。 为了跟踪这一点,我们计划在将数据复制到源 ADLS 后保留一个确认文件,以便在复制之前我们可以检查 ack 文件并仅在 ack 文件存在时才进行数据复制。

所以检查应该每 10 分钟进行一次如果 ack 文件不存在,这个检查应该在 10 分钟后 运行 并且应该持续 2 小时。 在这 2 小时内,如果文件存在,则数据复制应继续进行,检查任务也应停止。 如果 2 小时后没有数据,则作业应该失败。

我正在尝试在 ADF 中执行验证任务。但一个问题是文件夹名称,因为我的文件夹将以数据和创建时间戳命名(例如:2021-03-30-02-19-33)。 在提供文件夹名称时,我必须排除文件夹的时间戳部分。 这怎么可能。是否接受通配符路径进行验证 activity?

任何线索如何实现这个?

有什么办法可以在get matadata任务中实现10分钟2小时后连续检查?我们可以使用获取元数据任务来实现上述场景吗?

据我所知,ADF validationmetadata 活动不支持 folder/file 路径的通配符路径。

如果我们确实需要使用通配符路径,我们必须在笔记本文件上编写 Scala/Python.. 脚本并从 ADF 执行。

我在下面使用了从 ADF 获取输入参数的 scala 脚本。

import java.io.File
import java.util.Calendar

dbutils.widgets.text("mainFolderPath", "","")
dbutils.widgets.text("finalFolderStartName", "","")
dbutils.widgets.text("fileName", "","")
dbutils.widgets.text("noOfTry", "1","")
val mainFolderPath = dbutils.widgets.get("mainFolderPath")
val finalFolderStartName = dbutils.widgets.get("finalFolderStartName")
val fileName = dbutils.widgets.get("fileName")
val noOfTry = (dbutils.widgets.get("noOfTry")).toInt

println("Main folder path : " + mainFolderPath)
println("Final folder start name : " + finalFolderStartName)
println("File name to be checked : " + fileName)
println("Number of tries with a gap of 1 mins : " + noOfTry)

if(mainFolderPath == "" || finalFolderStartName == "" || fileName == ""){
  dbutils.notebook.exit("Please pass input parameters and rerun!")
}

def getListOfSubDirectories(directoryName: String): Array[String] = {
    (new File(directoryName))
        .listFiles
        .filter(_.isDirectory)
        .map(_.getName)   
}

var counter = 0
var folderFound = false
var fileFound = false

try{
  while (counter < noOfTry && !fileFound) {
    val folders = getListOfSubDirectories(mainFolderPath)
    if(folders.exists(firstName => firstName.startsWith(finalFolderStartName))){
      folders.foreach(fol => {
        if(fol.startsWith(finalFolderStartName)){
          val finalPath = mainFolderPath + "/" + fol + "/" + fileName
          println("Final file path : " + finalPath)
          folderFound = true
          if(new File(finalPath).exists) {
            fileFound = true
          }else{
            println("found the final folder but no file found!")
            println("waiting for 10 mins! " + Calendar.getInstance().getTime())
            counter = counter+1
            Thread.sleep(1*60*1000)
          }
        }
      })
    }else{
    println("folder does not exists with name : " + mainFolderPath + "/" + finalFolderStartName + "*")
    println("waiting for 10 mins! " + Calendar.getInstance().getTime())
    counter = counter+1
    Thread.sleep(1*60*1000)
    }
  }
}catch{
  case e : Throwable =>  throw e;
}

if(folderFound && fileFound){
  println("File Exists!")
}else{
  throw new Exception("File does not exists!");
}