gpt4 book ai didi

scala - 将重试添加到 future 的序列中,以便在 Scala 中并行运行 Databricks 笔记本

转载 作者:行者123 更新时间:2023-12-04 09:31:30 24 4
gpt4 key购买 nike

我使用 Databricks 本身的以下代码来说明如何在 Scala 中并行运行它的 notebook,https://docs.databricks.com/notebooks/notebook-workflows.html#run-multiple-notebooks-concurrently .我正在尝试添加重试功能,如果序列中的一个笔记本失败,它将根据我传递给它的重试值重试该笔记本。
这是来自 Databricks 的并行笔记本代码:

//parallel notebook code

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.control.NonFatal

case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])

def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[String]] = {
import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 5
// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()

Future.sequence(
notebooks.map { notebook =>
Future {
dbutils.notebook.setContext(ctx)
if (notebook.parameters.nonEmpty)
dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else
dbutils.notebook.run(notebook.path, notebook.timeout)
}
.recover {
case NonFatal(e) => s"ERROR: ${e.getMessage}"
}
}
)
}
这是我如何调用上述代码来运行多个示例笔记本的示例:
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
val notebooks = Seq(
NotebookData("Notebook1", 0, Map("client"->client)),
NotebookData("Notebook2", 0, Map("client"->client))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value

最佳答案

这是一种尝试。由于您的代码无法编译,因此我插入了一些虚拟类。
此外,您没有完全指定所需的行为,所以我做了一些假设。每个连接只进行五次重试。如果任何一个 Future 在五次重试后仍然失败,那么整个 Future 都失败了。这两种行为都可以更改,但由于您没有指定,我不确定您想要什么。
如果您有任何疑问或希望我对程序进行更改,请在评论部分告诉我。

object TestNotebookData extends App{
//parallel notebook code

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.control.NonFatal

case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])

case class Context()

case class Notebook(){
def getContext(): Context = Context()
def setContext(ctx: Context): Unit = ()
def run(path: String, timeout: Int, paramters: Map[String, String] = Map()): Seq[String] = Seq()
}
case class Dbutils(notebook: Notebook)

val dbutils = Dbutils(Notebook())


def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Seq[String]]] = {
import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()

val isRetryable = true
val retries = 5

def runNotebook(notebook: NotebookData): Future[Seq[String]] = {
def retryWrapper(retry: Boolean, current: Int, max: Int): Future[Seq[String]] = {
val fut = Future {runNotebookInner}
if (retry && current < max) fut.recoverWith{ _ => retryWrapper(retry, current + 1, max)}
else fut
}

def runNotebookInner() = {
dbutils.notebook.setContext(ctx)
if (notebook.parameters.nonEmpty)
dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else
dbutils.notebook.run(notebook.path, notebook.timeout)
}

retryWrapper(isRetryable, 0, retries)
}


Future.sequence(
notebooks.map { notebook =>
runNotebook(notebook)
}
)
}

val notebooks = Seq(
NotebookData("Notebook1", 0, Map("client"->"client")),
NotebookData("Notebook2", 0, Map("client"->"client"))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
}

关于scala - 将重试添加到 future 的序列中,以便在 Scala 中并行运行 Databricks 笔记本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62822025/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com