gpt4 book ai didi

scala - 以编程方式向 Spark session 添加/删除执行程序

转载 作者:行者123 更新时间:2023-12-01 10:07:33 25 4
gpt4 key购买 nike

我正在 Spark (v2+) 中寻找一种可靠的方法来以编程方式调整 session 中的执行程序数量。

我知道动态分配以及在创建 session 时配置 spark 执行程序的能力(例如使用 --num-executors ),但由于我的 Spark 工作的性质,这些选项对我来说都不是很有用。

我的 Spark 工作

该作业对大量数据执行以下步骤:

  • 对数据执行一些聚合/检查
  • 将数据加载到 Elasticsearch(ES 集群通常比 Spark 集群小很多)

  • 问题
  • 如果我使用全套可用的 Spark 资源,我会非常
    快速重载 Elasticsearch 甚至可能会破坏
    Elasticsearch 节点。
  • 如果我使用足够少的 spark executors 以免压倒
    Elasticsearch,第 1 步需要 更长比它需要的(因为它有一个
    可用 Spark 资源的一小部分)

  • 我很感激我可以将此作业拆分为两个作业,分别使用不同的 Spark 资源配置文件执行,但我真正想要的是在我的 Spark 脚本中的特定点(在 Elasticsearch 加载开始之前)以编程方式将执行程序的数量设置为 X )。这似乎是一件很有用的事情,可以普遍做到。

    我最初的尝试

    我尝试了一些更改设置并找到了一些有用的东西,但感觉像是一种做某事的hacky方式,应该以更标准化和受支持的方式可行。

    我的尝试(这只是我在玩):
    def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect { 
    case bm if !bm.isDriver => bm
    }

    def reduceExecutors(totalNumber: Int): Unit = {
    //TODO throw error if totalNumber is more than current
    logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
    spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
    val killedExecutors = scala.collection.mutable.ListBuffer[String]()
    while (getExecutors.size > totalNumber) {
    val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
    val executorsToKill = Random.shuffle(executorIds).take(executorIds.size - totalNumber)
    spark.sparkContext.killExecutors(executorsToKill)
    killedExecutors ++= executorsToKill
    Thread.sleep(1000)
    }
    }

    def increaseExecutors(totalNumber: Int): Unit = {
    //TODO throw error if totalNumber is less than current
    logger.info(s"""Attempting to increase number of executors to $totalNumber""")
    spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
    while (getExecutors.size < totalNumber) {
    Thread.sleep(1000)
    }
    }

    最佳答案

    您可以尝试的一件事是调用

    val dfForES = df.coalesce(numberOfParallelElasticSearchUploads) 

    在步骤#2 之前。这将减少分区的数量而不会产生混洗开销,并确保只有 max numberOfParallelElasticSearchUploads 执行器并行地向 ES 发送数据,而其余执行器则处于空闲状态。

    如果您在共享集群上运行您的作业,我仍然建议启用动态分配以释放这些空闲的执行程序以获得更好的资源利用率。

    关于scala - 以编程方式向 Spark session 添加/删除执行程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51397880/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com