在我的 Scala 代码中每次重试之前添加等待时间
add wait period before each retry in my scala code
我有一个 spark 连接器笔记本,“将表导出到数据库”,它将 spark table 数据写入 Azure SQL 数据库。我有一个主笔记本,它调用那个 spark 连接器笔记本来并行编写许多 table。如果副本失败,我在主笔记本中有一个重试部分,用于重试导出。但是,它会导致我的数据库中出现重复项,因为最初失败的数据库不会立即取消连接。我想在每次重试之前添加一个等待时间。我该怎么做?
////these next four class and functions are for exporting data directly to the Azure SQL database via the spark connectors.
// the next two functions are for retry purpose. if exporting a table faile, it will retry
def tryNotebookRun (path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]): Try[Any] = {
Try(
if (parameters.nonEmpty){
dbutils.notebook.run(path, timeout, parameters)
}
else{
dbutils.notebook.run(path, timeout)
}
)
}
def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 3) = {
var numRetries = 0
// I want to add a wait period here
while (numRetries < maxRetries){
tryNotebookRun(path, timeout, parameters) match {
case Success(_) => numRetries = maxRetries
case Failure(_) => numRetries = numRetries + 1
}
}
}
case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])
def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Any]] = {
val numNotebooksInParallel = 5
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()
Future.sequence(
notebooks.map { notebook =>
Future {
dbutils.notebook.setContext(ctx)
runWithRetry(notebook.path, notebook.timeout, notebook.parameters)
}
.recover {
case NonFatal(e) => s"ERROR: ${e.getMessage}"
}
}
)
}
////create a sequence of tables to be writed out in parallel
val notebooks = Seq(
NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table1")),
NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table2"))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
添加 Thread.sleep 是解决方案
def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 2) = {
var numRetries = 0
while (numRetries < maxRetries){
tryNotebookRun(path, timeout, parameters) match {
case Success(_) => numRetries = maxRetries
case Failure(_) => {
Thread.sleep(30000)
numRetries = numRetries + 1
}
}
}
}
我有一个 spark 连接器笔记本,“将表导出到数据库”,它将 spark table 数据写入 Azure SQL 数据库。我有一个主笔记本,它调用那个 spark 连接器笔记本来并行编写许多 table。如果副本失败,我在主笔记本中有一个重试部分,用于重试导出。但是,它会导致我的数据库中出现重复项,因为最初失败的数据库不会立即取消连接。我想在每次重试之前添加一个等待时间。我该怎么做?
////these next four class and functions are for exporting data directly to the Azure SQL database via the spark connectors.
// the next two functions are for retry purpose. if exporting a table faile, it will retry
def tryNotebookRun (path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]): Try[Any] = {
Try(
if (parameters.nonEmpty){
dbutils.notebook.run(path, timeout, parameters)
}
else{
dbutils.notebook.run(path, timeout)
}
)
}
def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 3) = {
var numRetries = 0
// I want to add a wait period here
while (numRetries < maxRetries){
tryNotebookRun(path, timeout, parameters) match {
case Success(_) => numRetries = maxRetries
case Failure(_) => numRetries = numRetries + 1
}
}
}
case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])
def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Any]] = {
val numNotebooksInParallel = 5
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()
Future.sequence(
notebooks.map { notebook =>
Future {
dbutils.notebook.setContext(ctx)
runWithRetry(notebook.path, notebook.timeout, notebook.parameters)
}
.recover {
case NonFatal(e) => s"ERROR: ${e.getMessage}"
}
}
)
}
////create a sequence of tables to be writed out in parallel
val notebooks = Seq(
NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table1")),
NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table2"))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
添加 Thread.sleep 是解决方案
def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 2) = {
var numRetries = 0
while (numRetries < maxRetries){
tryNotebookRun(path, timeout, parameters) match {
case Success(_) => numRetries = maxRetries
case Failure(_) => {
Thread.sleep(30000)
numRetries = numRetries + 1
}
}
}
}