- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
错误:
ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
目标:获得所有使用该模型的用户的推荐,并与每个用户的测试数据重叠并生成重叠率。
我使用 Spark mllib 构建了一个推荐模型。我评估每个用户的测试数据和每个用户的推荐项目的重叠率,并生成平均重叠率。
def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count
var r = 0.0
if (overlaps != 0)
r = overlaps / n
return r
}
但这里的问题是它最终会抛出以上 maxResultSize
错误。在我的 Spark 配置中,我执行了以下操作来增加 maxResultSize
。
val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")
但这并没有解决问题,我几乎接近分配驱动程序内存的数量,但问题没有得到解决。当代码执行时,我一直关注着我的 Spark 作业,我所看到的有点令人费解。
[Stage 281:==> (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)
在上述阶段,代码正在 Spark-mllib recommendForAll
中的 line 277
处执行 MatrixFactorization 代码(不确定行号)。
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
recommendForAll
方法从 recommendProductsForUsers
方法调用。
但看起来该方法正在分拆 100 万个任务。收到的数据来自 2000 个零件文件,所以我很困惑它是如何开始吐出 1M 任务的,我认为这可能是问题所在。
我的问题是如何才能真正解决这个问题。如果不使用这种方法,就很难计算重叠率
或recall@K
。这是 Spark 1.5 (cloudera 5.5)
最佳答案
2GB 问题对于 Spark 社区来说并不新鲜:https://issues.apache.org/jira/browse/SPARK-6235
重新/分区大小大于2GB,尝试将您的RDD重新分区(myRdd.repartition(parallelism)
)到更多数量的分区(w/r/t/您当前的并行度级别) ),从而减少每个分区的大小。
关于旋转的任务数量(因此创建分区),我的假设是它可能来自 srcBlocks.cartesian(dstBlocks)
API 调用,该调用生成由 ( z = srcBlocks 的分区数 * dstBlocks 的分区数)分区数。
在这种情况下,您可以考虑利用 myRdd.coalesce(parallelism)
API 而不是 repartition
API 来避免随机播放(以及与分区序列化相关的问题)。
关于scala - SparkError : Total size of serialized results of XXXX tasks (2. 0 GB)大于spark.driver.maxResultSize(2.0 GB),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34035523/
这个问题在这里已经有了答案: Why use async and return await, when you can return Task directly? (8 个答案) 关闭 6 年前。
这个问题在这里已经有了答案: Are the days of passing const std::string & as a parameter over? (13 个答案) 关闭 8 年前。 我
我有一组标记为执行的通用任务。当任务完成时(使用 Task.WaitAny ),我将其添加到 ObservableCollection 中. 但是,问题出在 Task.WaitAny(...)行,上面
经过几个小时的努力,我在我的应用程序中发现了一个错误。我认为下面的 2 个函数具有相同的行为,但事实证明它们没有。 谁能告诉我引擎盖下到底发生了什么,以及为什么它们的行为方式不同? public as
这也与 Python 的导入机制有关,特别是与在函数内使用 import 有关。使用 Python 2.7.9 和 Fabric 1.10.0,创建以下三个文件: fabfile.py: from a
我有一个 Web API Controller (ASP.NET Core 5)。我的一些 API 是异步的,而其中一些不是。我接下来的问题是:使用 public **Task** WebApiMet
我们有类似下面的内容 List uncheckItems = new List(); for (int i = 0; i new Task(async () => await Process
我的代码没问题,但我想知道哪种风格更好,你会怎么看,我正在玩异步方法。 让我建立上下文: Parallel.ForEach(xmlAnimalList, async xml => {
这两种使用 await 的形式在功能上有什么区别吗? string x = await Task.Factory.StartNew(() => GetAnimal("feline")); Task m
我刚刚看到 3 个关于 TPL 使用的例程,它们做同样的工作;这是代码: public static void Main() { Thread.CurrentThread.Name = "Ma
考虑以下代码: public void CacheData() { Task.Run((Action)CacheExternalData); Task.Run(() => CacheE
Task> GetTaskDict() { return Task.FromResult(new Dictionary () ); } 此代码无法编译,因为我们无法在 Task> 到 Tas
我正在使用 ASP.NET 5 RC1 _MyPartial @model MyViewModel @using (Html.BeginForm())
当我尝试在 VS Code 中构建 C 任务时,它显示以下消息: 输出仅显示:The task provider for "C/C++" tasks unexpectedly provided a t
一些背景: 基本上归结为我希望能够在当前线程中“执行”任务。为什么? -我有一个任务创建程序例程,有一次我希望任务在后台任务中立即执行,而其他时候我希望使用 IOmniThreadPool 安排任务。
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
我试图将run-sequence添加到我的gulp工作流程中,但是每次尝试执行使用run-sequence的任务时,都会出现此错误: 任务未配置为gulp上的任务。 根据运行序列的来源,这是由以下te
此代码在VS2015中给出了编译时错误 Error CS0266 Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'Sy
我正在尝试通过我的代码通过Google登出: suspend fun signOut(context: Context): Boolean = with(Dispatchers.IO) { t
谁能解释一下这两种说法的区别: Task bTask = backup.BackupCurrentDatabaseAsync() .ContinueWith(_ => CompressArch
我是一名优秀的程序员,十分优秀!