gpt4 book ai didi

scala - Spark 内存限制超出问题

转载 作者:可可西里 更新时间:2023-11-01 16:28:55 26 4
gpt4 key购买 nike

我有一份在 spark 上运行的工作,它是使用 spark RDD 在 scala im 中编写的。由于昂贵的分组操作我得到这个错误: 容器因超出内存限制而被 YARN 终止。使用了 22.4 GB 的 22 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。 我增加了头顶的内存,但我得到了同样的结果。我使用 10 台 r4.xlarge 机器。我尝试使用 r4.2xlarge 甚至 r4.4xlarge,但也出现同样的错误。我正在测试的数据是 5GB 压缩数据(将近 50 个解压缩数据和近 600 万条记录)。

一些配置:

spark.executor.memory:20480Mspark.driver.memory:21295Mspark.yarn.executor.memoryOverhead:2gspark.executor.instances:10

代码如下所示:

val groupedEntitiesRDD = datasetRDD 
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)

val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })

def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}

最佳答案

根据我的经验和我在 Spark 2.x 的发行说明中阅读的内容,需要分配比在 Spark 1.x。

你只分配了 2G 给 memoryOverhead 和 20GB 内存。我相信如果将其更改为 8G memoryOverhead 和 14GB executor 内存,您会得到更好的结果。

如果您仍然遇到内存问题(例如抛出实际的 OOM),您将需要查看数据偏差。尤其是groupBy操作会频繁导致严重的数据倾斜。

最后一件事,您写道您使用 RDD - 我希望您指的是 DataFramesDataSetsRDDsgroupBy 的性能非常低(参见 this 博客文章了解原因)所以如果你在 RDDs 你应该使用reduceByKey 代替。但本质上,您应该改用 DataFrames(或 DataSet),其中 groupBy 确实是正确的方法。

编辑!

您在评论中询问如何将 groupBy 转换为 reduceByKey。你可以这样做:

datasetRDD
.map{case(entityID, streamObject) => (entityID, List(streamObject))}
.reduceByKey(_++_)
.flatMap{case(_, entities) => deduplication(entities)

您没有指定这些实体的数据结构,但看起来您正在寻找一些最大值并实际上丢弃了不需要的数据。这应该构建到 reduceByKey 操作中,这样您就可以在减少的同时过滤掉不必要的数据。

关于scala - Spark 内存限制超出问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45479813/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com