我使用databricks本身的以下代码来说明如何在scala中并行运行它的笔记本,https://docs.databricks.com/notebooks/notebook-workflows.html#run-同时使用多个笔记本。我正在尝试添加重试功能,如果序列中的一个笔记本失败,它将根据我传递给它的重试值重试该笔记本。
下面是databricks的并行笔记本代码:
//parallel notebook code
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.control.NonFatal
case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])
def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[String]] = {
import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException
val numNotebooksInParallel = 5
// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()
Future.sequence(
notebooks.map { notebook =>
Future {
dbutils.notebook.setContext(ctx)
if (notebook.parameters.nonEmpty)
dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else
dbutils.notebook.run(notebook.path, notebook.timeout)
}
.recover {
case NonFatal(e) => s"ERROR: ${e.getMessage}"
}
}
)
}
这是我如何调用上述代码来运行多个示例的示例:
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
val notebooks = Seq(
NotebookData("Notebook1", 0, Map("client"->client)),
NotebookData("Notebook2", 0, Map("client"->client))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
2条答案
按热度按时间dphi5xsq1#
这里有一个尝试。因为您的代码没有编译,所以我插入了几个伪类。
另外,您没有完全指定所需的行为,所以我做了一些假设。每次连接只重试五次。如果在五次重试之后,任何一个未来仍然失败,那么整个未来都失败了。这两种行为都可以更改,但由于您没有指定,我不确定您想要的是什么。
如果您有问题或想让我对程序进行修改,请在评论部分告诉我。
cs7cruho2#
我发现这个有用: