在scala代码中添加每次重试前的等待期

hjqgdpho  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(414)

我有一个spark连接器笔记本,“将表导出到数据库”,它将spark表数据写入azuresql数据库。我有一个主笔记本,称之为Spark连接器笔记本写许多表并行。如果复制失败,我在主笔记本中有一个重试部分,用于重试导出。但是,它在我的数据库中造成了重复,因为原来失败的数据库不会立即取消连接。我想在每次重试之前添加一个等待期。我该怎么做?

////these next four class and functions are for exporting  data directly to the Azure SQL database via the spark connectors. 
// the next two functions are for retry purpose. if exporting a table faile, it will retry
def tryNotebookRun (path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]): Try[Any] = {
  Try(
    if (parameters.nonEmpty){
      dbutils.notebook.run(path, timeout, parameters)
    }
    else{
      dbutils.notebook.run(path, timeout)
    }
  )
}

def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 3) = {
  var numRetries = 0

  // I want to add a wait period here
  while (numRetries < maxRetries){

    tryNotebookRun(path, timeout, parameters) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => numRetries = numRetries + 1      
    }    
  }
}

case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])

def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Any]] = {

  val numNotebooksInParallel = 5
  // This code limits the number of parallel notebooks.
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  val ctx = dbutils.notebook.getContext()

  Future.sequence(
    notebooks.map { notebook => 
      Future {
        dbutils.notebook.setContext(ctx)
        runWithRetry(notebook.path, notebook.timeout, notebook.parameters)
      }
      .recover {
        case NonFatal(e) => s"ERROR: ${e.getMessage}"
      }
    }
  )
}

////create a sequence of tables to be writed out in parallel

val notebooks = Seq(

  NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table1")),
  NotebookData("Export Tables To Database", 0, Map("client"->client, "scope"->scope, "secret"->secret, "schema"->"test", "dbTable"->"table2"))
)

val res = parallelNotebooks(notebooks)

Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
jbose2ul

jbose2ul1#

添加thread.sleep是解决方案

def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 2) = {
  var numRetries = 0
  while (numRetries < maxRetries){
    tryNotebookRun(path, timeout, parameters) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => {
        Thread.sleep(30000)
        numRetries = numRetries + 1
      }
    }    
  }
}

相关问题