gpt4 book ai didi

scala - Spark如何处理涉及JDBC数据源的故障场景?

转载 作者:行者123 更新时间:2023-12-04 11:31:21 25 4
gpt4 key购买 nike

我正在写一个与 Spark 的 JDBC 数据源实现有相似之处的数据源,我想问一下 Spark 如何处理某些故障场景。据我所知,如果执行器在运行任务时死亡,Spark 将恢复执行器并尝试重新运行该任务。但是,这在数据完整性和 Spark 的 JDBC 数据源 API(例如 df.write.format("jdbc").option(...).save())的上下文中如何发挥作用?

JdbcUtils.scala的savePartition函数中,我们看到 Spark 调用了从用户提供的数据库 url/credentials 生成的 Java 连接对象的 commit 和 rollback 函数(见下文)。但是如果一个 executor 在 commit() 完成之后或在 rollback() 被调用之前就死了,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,实际上是在数据库中创建重复的提交行?如果 executor 在调用 commit() 或 rollback() 的过程中死亡,会发生什么?

try {
...
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
...
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}

最佳答案

But if an executor dies right after commit() finishes or before rollback() is called, does Spark try to re-run the task and write the same data partition again, essentially creating duplicate committed rows in the database?



由于 Spark SQL(它是 RDD API 之上的高级 API)并不真正了解 JDBC 或任何其他协议(protocol)的所有特性,您会期望什么?更不用说底层执行运行时了,即 Spark Core。

当您编写像 df.write.format(“jdbc”).option(...).save() 这样的结构化查询时Spark SQL 使用类似于低级程序集的 RDD API 将其转换为分布式计算。由于它试图包含尽可能多的“协议(protocol)”(包括 JDBC),Spark SQL 的 DataSource API 将大部分错误处理留给了数据源本身。

Spark 调度任务的核心(不知道甚至不关心任务做什么)只是监视执行,如果任务失败,它将尝试再次执行它(直到默认情况下尝试失败 3 次)。

因此,当您编写自定义数据源时,您就了解了演练,并且必须在代码中处理此类重试。

处理错误的一种方法是使用 TaskContext 注册一个任务监听器。 (例如 addTaskCompletionListeneraddTaskFailureListener )。

关于scala - Spark如何处理涉及JDBC数据源的故障场景?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54118841/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com