- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们正在使用 Apache Spark 2.1.1 生成一些每日报告。这些报告是根据一些日常数据生成的,我们在分别运行每个单元的报告并将它们合并在一起之前坚持这些数据。这是我们正在做的事情的简化版本:
def unitReport(d: Date, df: DataFrame, u: String): DataFrame = ... // Builds a report based on unit `u`
val date: Date = ... // Date to run the report
val dailyData: DataFrame = someDailyData.persist() // Daily data
val units: Seq[String] = Seq("Unit_A", "Unit_B", "Unit_C")
val report: DataFrame =
units.map(unitReport(date, dailyData, _)) // Report for each unit.
.reduce((a, b) => a.union(b)) // Join all the units together.
在此之后,我们将报告作为 csv 写入 HDFS,将各部分连接在一起,然后通过电子邮件发送报告。
我们已经开始遇到这些报告中最大的问题,该报告在大约五十个单位上运行。我们不断提高最大结果大小(现在为 10G)以及驱动器内存并不断达到它。这里令人困惑的事情是 a) 我们从来没有将结果拉回到驱动程序和 b) 最终输出的报告仅占用 145k 和 1298 行 CSV 格式,为什么我们要传递 8G 的 maxResultSize
?关于 Spark 如何管理内存、resultSize
中究竟包含什么以及将什么发送回驱动程序,我们觉得有些事情我们不了解,但很难找到任何解释或文档。以下是报告最后阶段的片段,就在它开始耗尽内存之前,让您了解报告的复杂性:
[Stage 2297:===========================================> (4822 + 412) / 5316]
[Stage 2297:===========================================> (4848 + 394) / 5316]
[Stage 2297:============================================> (4877 + 370) / 5316]
[Stage 2297:============================================> (4909 + 343) / 5316]
[Stage 2297:============================================> (4944 + 311) / 5316]
[Stage 2297:============================================> (4964 + 293) / 5316]
[Stage 2297:============================================> (4980 + 278) / 5316]
[Stage 2297:=============================================> (4996 + 266) / 5316]
[Stage 2297:=============================================> (5018 + 246) / 5316]
我们通过以下代码发现了我们认为类似的内存效应:
import org.apache.spark.mllib.random.RandomRDDs._
val df = normalRDD(sc, 1000000000L, 1000000).toDF()
df.filter($"value" > 0.9).count()
虽然此代码仅返回一个简单的计数,但当我们最终在驱动程序上遇到此内存不足错误时:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:174)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$class.loop$1(Growable.scala:53)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
当我们监控驱动程序上的日志时,我们发现它正在不断地进行完全垃圾收集,并且整体内存逐渐增加:
2.095: [GC [PSYoungGen: 64512K->8399K(74752K)] 64512K->8407K(244224K), 0.0289150 secs] [Times: user=0.05 sys=0.02, real=0.02 secs]
3.989: [GC [PSYoungGen: 72911K->10235K(139264K)] 72919K->10709K(308736K), 0.0257280 secs] [Times: user=0.04 sys=0.02, real=0.02 secs]
5.936: [GC [PSYoungGen: 139259K->10231K(139264K)] 139733K->67362K(308736K), 0.0741340 secs] [Times: user=0.40 sys=0.12, real=0.07 secs]
10.842: [GC [PSYoungGen: 139255K->10231K(268288K)] 196386K->86311K(437760K), 0.0678030 secs] [Times: user=0.28 sys=0.07, real=0.07 secs]
19.282: [GC [PSYoungGen: 268279K->10236K(268288K)] 344359K->122829K(437760K), 0.0642890 secs] [Times: user=0.32 sys=0.10, real=0.06 secs]
22.981: [GC [PSYoungGen: 268284K->30989K(289792K)] 380877K->143582K(459264K), 0.0811960 secs] [Times: user=0.20 sys=0.07, real=0.08 secs]
有人知道发生了什么事吗?非常感谢任何解释或文档指针。
最佳答案
很难确定,但我猜这与 DataFrame 中的分区总数有关,这是减少的结果,而且这个数字可能比你拥有更多的单元,因为 a.union(b)
中的分区数是 a
和 b
的分区数之和。
虽然数据不存储/发送给驱动程序,驱动程序确实管理代表所有分区的对象和分配给每个分区的任务其中一个;如果您的 DataFrame 最终有数百万个分区,Driver 将创建(然后使用 GC 收集)数百万个对象。
因此,尝试更改并集操作以包含coalesce
操作以限制分区总数:
val MaxParts = dailyData.rdd.partitions.length * 2 // or anything, but something reasonable
val report: DataFrame =
units.map(unitReport(date, dailyData, _))
.reduce((a, b) => a.union(b).coalesce(MaxParts))
关于scala - `maxResultSize` 包含什么以及所有驱动程序内存在哪里,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47912774/
我们正在使用 Apache Spark 2.1.1 生成一些每日报告。这些报告是根据一些日常数据生成的,我们在分别运行每个单元的报告并将它们合并在一起之前坚持这些数据。这是我们正在做的事情的简化版本:
我们正在使用 Apache Spark 2.1.1 生成一些每日报告。这些报告是根据一些日常数据生成的,我们在分别运行每个单元的报告并将它们合并在一起之前坚持这些数据。这是我们正在做的事情的简化版本:
我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个 mutable.HashMap[Zone, Double] .我想使用以下代码合并共享给定 k
美好的一天。 我正在运行用于解析某些日志文件的开发代码。如果我尝试解析更少的文件,我的代码将流畅运行。但是随着我需要解析的日志文件数量的增加,它将返回不同的错误,例如too many open fil
ref说: Limit of total size of serialized results of all partitions for each Spark action (e.g. collec
我有一个执行大型连接的 Spark 应用程序 val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"en
我知道当您在 pyspark 中处于客户端模式时,您无法在脚本中设置配置,因为 JVM 会在加载库后立即启动。 因此,设置配置的方法是实际去编辑启动它的 shell 脚本:spark-env.sh..
我正在使用 Spark 1.4 进行研究,并在内存设置方面遇到了困难。我的机器有 16GB 的内存,所以没有问题,因为我的文件大小只有 300MB。虽然,当我尝试使用 toPandas() 函数将 S
我正在使用 dynamo db 分页,基于 AWS 文档: --> maxResultSize为本次查询最多检索的资源数,包括检索的所有页面的所有资源。 --> maxPageSize是单个页面最多检
我在 emr 中使用 Jupyter notebook 来处理大量数据。在处理数据时,我看到了这个错误: An error occurred while calling z:org.apache.sp
我正在尝试将 spark.driver.maxResultSize 值更新为 6g,但该值未得到更新。 spark.conf.set("spark.driver.maxResultSize", '6g
我正在尝试将 spark.driver.maxResultSize 值更新为 6g,但该值未得到更新。 spark.conf.set("spark.driver.maxResultSize", '6g
当我将 --conf spark.driver.maxResultSize=2050 添加到我的 spark-submit 命令时,出现以下错误。 17/12/27 18:33:19 ERROR Tr
错误: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized resu
错误: ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spa
我是一名优秀的程序员,十分优秀!