在我的 Scala 代码中每次重试之前添加等待时间

add wait period before each retry in my scala code

我有一个 spark 连接器笔记本,“将表导出到数据库”,它将 spark table 数据写入 Azure SQL 数据库。我有一个主笔记本,它调用那个 spark 连接器笔记本来并行编写许多 table。如果副本失败,我在主笔记本中有一个重试部分,用于重试导出。但是,它会导致我的数据库中出现重复项,因为最初失败的数据库不会立即取消连接。我想在每次重试之前添加一个等待时间。我该怎么做?

////these next four class and functions are for exporting  data directly to the Azure SQL database via the spark connectors. 
// the next two functions are for retry purpose. if exporting a table faile, it will retry
def tryNotebookRun (path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]): Try[Any] = {
  Try(
    if (parameters.nonEmpty){
      dbutils.notebook.run(path, timeout, parameters)
    }
    else{
      dbutils.notebook.run(path, timeout)
    }
  )
}

def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 3) = {
  var numRetries = 0
  
  // I want to add a wait period here
  while (numRetries < maxRetries){
    
    tryNotebookRun(path, timeout, parameters) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => numRetries = numRetries + 1      
    }    
  }
}

case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])

def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Any]] = {

  val numNotebooksInParallel = 5
  // This code limits the number of parallel notebooks.
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  val ctx = dbutils.notebook.getContext()
  
  Future.sequence(
    notebooks.map { notebook => 
      Future {
        dbutils.notebook.setContext(ctx)
        runWithRetry(notebook.path, notebook.timeout, notebook.parameters)
      }
      .recover {
        case NonFatal(e) => s"ERROR: ${e.getMessage}"
      }
    }
  )
}

////create a sequence of tables to be writed out in parallel

val notebooks = Seq(
  
  NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table1")),
  NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table2"))
)

val res = parallelNotebooks(notebooks)

Await.result(res, 3000000 seconds) // this is a blocking call.
res.value

添加 Thread.sleep 是解决方案

def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 2) = {
  var numRetries = 0
  while (numRetries < maxRetries){
    tryNotebookRun(path, timeout, parameters) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => {
        Thread.sleep(30000)
        numRetries = numRetries + 1
      }
    }    
  }
}