gpt4 book ai didi

apache-spark - 在 Spark 上下文 (JobProgressListener) 上使用多个同时作业进行 Spark 2 作业监控

转载 作者:行者123 更新时间:2023-12-05 07:46:15 25 4
gpt4 key购买 nike

在 Spark 2.0.x 上,我一直在使用 JobProgressListener从我们的集群中实时检索作业/阶段/任务进度信息的实现。我了解事件流程的运作方式,并成功收到工作更新。

我的问题是,我们有多个不同的提交在同一个 Spark 上下文中同时运行,而且似乎无法区分每个提交属于哪个作业/阶段/任务。每个作业/阶段/任务都会收到一个唯一的 ID,这很棒。但是,我正在寻找一种方法来提供将与接收到的 JobProgressListener 事件对象一起返回的提交“id”或“name”。

我知道可以在 Spark Context 上设置“Job Group”,但是如果多个作业同时运行在同一个上下文中,它们就会变得杂乱无章。

有没有一种方法可以潜入自定义属性,这些属性将与单个 SQLContext 的监听器事件一起返回?这样一来,我应该能够链接后续的 Stage 和 Task 事件并获得我需要的东西。

请注意:我没有为这些作业使用 spark-submit。它们是使用对 SparkSession/SQLContext 的 Java 引用执行的。

感谢您提供任何解决方案或想法。

最佳答案

我正在使用本地属性 - 这可以在 onStageSubmit 事件期间从监听器访问。之后,我使用相应的 stageId 来识别在该阶段执行的任务。

Future({
sc.setLocalProperty("job-context", "second")
val listener = new MetricListener("second")
sc.addSparkListener(listener)
//do some spark actions
val df = spark.read.load("...")
val countResult = df.filter(....).count()
println(listener.rows)
sc.removeSparkListener(listener)
})

class MetricListener(name:String) extends SparkListener{

var rows: Long = 0L
var stageId = -1

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
if (stageSubmitted.properties.getProperty("job-context") == name){
stageId = stageSubmitted.stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
if (taskEnd.stageId == stageId)
rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
}

}

关于apache-spark - 在 Spark 上下文 (JobProgressListener) 上使用多个同时作业进行 Spark 2 作业监控,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40935942/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com