gpt4 book ai didi

apache-spark - 收集 Spark 作业运行统计信息并保存到数据库的最佳方法是什么

转载 作者:行者123 更新时间:2023-12-04 04:39:00 25 4
gpt4 key购买 nike

我的 Spark 程序有几个表连接(使用 SPARKSQL),我想收集处理每个连接所需的时间并保存到统计表中。目的是在一段时间内连续运行它并以非常精细的级别收集性能。

例如

val DF1= spark.sql("从 A,B 中选择 x,y")

Val DF2 =spark.sql("从 TABLE1,TABLE2 中选择 k,v")

最后,我加入了 DF1 和 DF2,然后启动了类似 saveAsTable 的操作。

我正在寻找的是弄清楚

1.计算DF1真正需要多少时间

2.计算DF2需要多少时间和

3. 将这些最终加入保留到 Hive/HDFS 的时间

并将所有这些信息放入 RUN-STATISTICS 表/文件中。

任何帮助表示赞赏并提前感谢

最佳答案

Spark 使用延迟评估,允许引擎在非常精细的级别上优化 RDD 转换。
当你执行

val DF1= spark.sql("select x,y from A,B ")
除了将转换添加到有向无环图之外,什么也没有发生。
只有当你执行一个Action,比如DF1.count,驱动才会强制执行一个物理执行计划。这被尽可能地推迟到 RDD 转换链的下游。
所以问是不对的

1.How much time it really took to compute DF1

2.How much time to compute DF2 and


至少基于您提供的代码示例。您的代码没有“计算”val DF1。我们可能不知道处理 DF1 花了多长时间,除非您以某种方式欺骗编译器分别处理每个数据帧。
构建问题的更好方法可能是“我的工作总体分为多少阶段(任务),完成这些阶段(任务)需要多长时间”?
这可以通过查看日志文件/Web GUI 时间线轻松回答(根据您的设置有不同的风格)

3.How much time to persist those final Joins to Hive / HDFS


公平的问题。看看神经节

Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.


我喜欢使用它定义每个转换序列的另一个技巧,这些转换必须以单独函数内的 Action 结束,然后在“计时器函数” block 内的输入 RDD 上调用该函数。
例如,我的“计时器”是这样定义的
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1e9 + "s")
result
}
并且可以用作
val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")

scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2
但是,不要仅仅为了将 DAG 分解为更多阶段/广泛的依赖关系而调用不必要的操作。这可能会导致洗牌或减慢您的执行速度。
资源:
https://spark.apache.org/docs/latest/monitoring.html
http://ganglia.sourceforge.net/
https://www.youtube.com/watch?v=49Hr5xZyTEA

关于apache-spark - 收集 Spark 作业运行统计信息并保存到数据库的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50050804/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com