gpt4 book ai didi

scala - SparkError : Total size of serialized results of XXXX tasks (2. 0 GB)大于spark.driver.maxResultSize(2.0 GB)

转载 作者:行者123 更新时间:2023-12-03 01:45:16 27 4
gpt4 key购买 nike

错误:

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)

目标:获得所有使用该模型的用户的推荐,并与每个用户的测试数据重叠并生成重叠率。

我使用 Spark mllib 构建了一个推荐模型。我评估每个用户的测试数据和每个用户的推荐项目的重叠率,并生成平均重叠率。

  def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {

val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count

val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))

val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count

var r = 0.0
if (overlaps != 0)
r = overlaps / n

return r

}

但这里的问题是它最终会抛出以上 maxResultSize 错误。在我的 Spark 配置中,我执行了以下操作来增加 maxResultSize

val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")

但这并没有解决问题,我几乎接近分配驱动程序内存的数量,但问题没有得到解决。当代码执行时,我一直关注着我的 Spark 作业,我所看到的有点令人费解。

[Stage 281:==>   (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)

在上述阶段,代码正在 Spark-mllib recommendForAll 中的 line 277 处执行 MatrixFactorization 代码(不确定行号)。

  private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}

recommendForAll 方法从 recommendProductsForUsers 方法调用。

但看起来该方法正在分拆 100 万个任务。收到的数据来自 2000 个零件文件,所以我很困惑它是如何开始吐出 1M 任务的,我认为这可能是问题所在。

我的问题是如何才能真正解决这个问题。如果不使用这种方法,就很难计算重叠率recall@K。这是 Spark 1.5 (cloudera 5.5)

最佳答案

2GB 问题对于 Spark 社区来说并不新鲜:https://issues.apache.org/jira/browse/SPARK-6235

重新/分区大小大于2GB,尝试将您的RDD重新分区(myRdd.repartition(parallelism))到更多数量的分区(w/r/t/您当前的并行度级别) ),从而减少每个分区的大小。

关于旋转的任务数量(因此创建分区),我的假设是它可能来自 srcBlocks.cartesian(dstBlocks) API 调用,该调用生成由 ( z = srcBlocks 的分区数 * dstBlocks 的分区数)分区数。

在这种情况下,您可以考虑利用 myRdd.coalesce(parallelism) API 而不是 repartition API 来避免随机播放(以及与分区序列化相关的问题)。

关于scala - SparkError : Total size of serialized results of XXXX tasks (2. 0 GB)大于spark.driver.maxResultSize(2.0 GB),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34035523/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com