gpt4 book ai didi

scala - 'spark.driver.maxResultSize' 的范围

转载 作者:行者123 更新时间:2023-12-04 08:48:15 30 4
gpt4 key购买 nike

我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个 mutable.HashMap[Zone, Double] .我想使用以下代码合并共享给定 key (UUID)的所有配置文件:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()

奇怪的是,Spark 失败并出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)



显而易见的解决方案是增加“spark.driver.maxResultSize”,但有两件事让我感到困惑。
  • 太巧了,我得到的 1024.0 大于 1024.0
  • 我在谷歌上搜索此特定错误和配置参数时发现的所有文档和帮助都表明它会影响将值返回给驱动程序的函数。 (比如 take()collect() ),但我没有给驱动程序带来任何东西,只是从 HDFS 读取,聚合,保存回 HDFS。

  • 有谁知道我为什么会收到这个错误?

    最佳答案

    Yes, It's failing because The values we see in exception message arerounded off by one precision and comparison happening in bytes.

    That serialized output must be more than 1024.0 MB and less than 1024.1 MB.


    检查添加的 Apache Spark 代码片段,这个错误非常有趣并且非常罕见。 :)
    这里 totalResultSize > maxResultSize两者都是 Long 类型并且 in 以字节为单位保存值。但是 msg保存来自 Utils.bytesToString() 的舍入值.
    //TaskSetManager.scala
    def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
    totalResultSize += size
    calculatedTasks += 1
    if (maxResultSize > 0 && totalResultSize > maxResultSize) {
    val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
    s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
    s"(${Utils.bytesToString(maxResultSize)})"
    logError(msg)
    abort(msg)
    false
    } else {
    true
    }
    }
    Apache Spark 1.3 - source
    //Utils.scala
    def bytesToString(size: Long): String = {
    val TB = 1L << 40
    val GB = 1L << 30
    val MB = 1L << 20
    val KB = 1L << 10

    val (value, unit) = {
    if (size >= 2*TB) {
    (size.asInstanceOf[Double] / TB, "TB")
    } else if (size >= 2*GB) {
    (size.asInstanceOf[Double] / GB, "GB")
    } else if (size >= 2*MB) {
    (size.asInstanceOf[Double] / MB, "MB")
    } else if (size >= 2*KB) {
    (size.asInstanceOf[Double] / KB, "KB")
    } else {
    (size.asInstanceOf[Double], "B")
    }
    }
    "%.1f %s".formatLocal(Locale.US, value, unit)
    }
    Apache Spark 1.3 - source

    关于scala - 'spark.driver.maxResultSize' 的范围,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32530239/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com