- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我将代码提交到 spark 独立集群。提交命令如下:
nohup ./bin/spark-submit \
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2" \
./myCode.py 1>a.log 2>b.log &
我在上面的命令中指定执行器使用 4G 内存。但是使用 top 命令监控 executor 进程,我注意到内存使用量一直在增长。现在顶部的命令输出如下:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12578 root 20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java
我的总内存是 16G,所以 37.3% 已经比我指定的 4GB 大了。而且还在增长。
使用ps命令,可以知道是executor进程。
[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ? Sl 1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ? Sl 6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ? Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ? Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://Worker@10.79.148.184:52660
下面是代码。这很简单,所以我认为没有内存泄漏
if __name__ == "__main__":
dataDirectory = '/stream/raw'
sc = SparkContext(appName="Netflow")
ssc = StreamingContext(sc, 20)
# Read CSV File
lines = ssc.textFileStream(dataDirectory)
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
处理函数的代码如下。请注意,我在这里使用的是 HiveContext 而不是 SqlContext。因为SqlContext不支持窗口功能
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def process(time, rdd):
if rdd.isEmpty():
return sc.emptyRDD()
sqlContext = getSqlContextInstance(rdd.context)
# Convert CSV File to Dataframe
parts = rdd.map(lambda l: l.split(","))
rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
dataframe = sqlContext.createDataFrame(rowRdd)
# Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
ret.show()
dataframe.show()
其实我发现下面的代码会导致问题:
# Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
ret.show()
因为如果我删除这 5 行。该代码可以运行一整夜而不会显示内存增加。但是添加它们会导致executor的内存使用量增长到非常高。
基本上,上面的代码只是 SparkSQL 中的一些窗口 + grouby。那么这是一个错误吗?
最佳答案
Disclaimer: this answer isn't based on debugging, but more on observations and the documentation Apache Spark provides
我不认为这是一个错误!
查看您的配置,我们可以看到您主要关注执行器调优,这并没有错,但您忘记了等式中的驱动程序部分。
查看 Apache Spark documentaion 中的 spark 集群概述
如您所见,每个工作程序都有一个执行程序,但是,在您的情况下,工作程序节点与驱动程序节点相同!坦率地说,当您在本地运行或在单个节点的独立集群上运行时就是这种情况。
此外,驱动程序默认占用 1G 内存,除非使用 spark.driver.memory
标志进行调整。此外,您不应该忘记 JVM 本身的堆使用情况,以及由驱动程序处理的 Web UI AFAIK!
当您删除您提到的代码行时,您的代码将没有 actions 因为 map
函数只是一个转换,因此不会执行,并且因此,您根本看不到内存增加!
同样适用于 groupBy
因为它只是一个不会执行的转换,除非正在调用一个 Action ,在你的情况下是 agg
和 show
更进一步!
也就是说,如果您想控制此进程的核心数量,请尽量减少驱动程序内存和 spark 中由 spark.cores.max
定义的核心总数,然后级联到执行者。此外,我会添加 spark.python.profile.dump
到您的配置列表中,以便您可以查看 Spark 作业执行的配置文件,这可以帮助您更多地了解案例,并根据您的需要调整集群。
关于memory - 这是 Spark 流的错误还是内存泄漏?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37152501/
IntentReceiver 正在泄漏 由于 onDetachedFromWindow 在某些情况下未被调用。 @Override protected void onDetachedFromWind
好吧,我很难追踪这个内存泄漏。运行此脚本时,我没有看到任何内存泄漏,但我的 objectalloc 正在攀升。 Instruments 指向 CGBitmapContextCreateImage >
我编写了一个测试代码来检查如何使用 Instrument(Leaks)。我创建了一个单一 View 应用程序,单击按钮后我加载了一个像这样的新 View ... - (IBAction)btn_clk
我正在使用这个简单的代码并观察单调增加的内存使用量。我正在使用这个小模块将内容转储到磁盘。我观察到它发生在 unicode 字符串上而不是整数上,我做错了什么吗? 当我这样做时: >>> from u
我有以下泄漏的代码。 Instruments 表示,泄漏的是 rssParser 对象。我“刷新”了 XML 提要,它运行了该 block 并且发生了泄漏...... 文件.h @interface
我在我编写的以下代码片段中发现了内存泄漏 NSFileManager *fileManager=[[NSFileManager alloc] init]; fileList=[[fileManager
因此,我正在开发HTML5 / javascript rts游戏。观察一直有几种声音在播放。因此,对我来说,是一段时间后声音听起来像是“崩溃”,并且此浏览器选项卡上的所有声音都停止了工作。我只能通过重
下面是我正在使用的一段代码及其输出。 my $handle; my $enterCount = Devel::Leak::NoteSV($handle); print "$date entry $en
在这篇关于 go-routines 泄漏的帖子之后,https://www.ardanlabs.com/blog/2018/11/goroutine-leaks-the-forgotten-sende
我想知道为什么在执行 ./a.out 后随机得到以下结果。有什么想法我做错了吗?谢谢 http://img710.imageshack.us/img710/8708/trasht.png 最佳答案 正
我正在 Swift 中开发一个应用程序,在呈现捕获我放在一起的二维码的自定义 ViewController 后,我注意到出现了巨大的内存跳跃。 该代码本质上基于以下示例:http://www.appc
下面是我的 javascript 代码片段。它没有按预期运行,请帮我解决这个问题。 function getCurrentLocation() { console.log("insi
我们在生产环境中部署了 3 个代理 Kafka 0.10.1.0。有些应用程序嵌入了 Kafka Producer,它们将应用程序日志发送到某个主题。该主题有 10 个分区,复制因子为 3。 我们观察
我正在使用仪器来检测一些泄漏,但有一些泄漏我无法解决; NSMutableString *textedetails = [[NSMutableString alloc] init];
如果我使用性能工具测试我的代码 - 泄漏,它没有检测到任何泄漏。这是否意味着代码没有泄漏任何内存? 我有一个越狱的 iPhone,我可以监控可用内存。如果有人知道,那就是 SBSettings。我测试
我在从 AddressBook 中获取图像时遇到了很大的问题,下面我粘贴了我的代码。此 imageData 从未被释放,在我的 Allocations Instruments 上它看起来总是在内存中它
- (NSMutableArray *)getArrayValue:(NSArray *)array{ NSMutableArray *valueArray = [NSMutableArra
Instruments 工具说这是一个泄漏,有什么想法吗? 我在 for 循环结束时释放变量对象 在上述方法的开头,这就是我设置变量对象的方式,即自动释放; NSMutableArray *varia
我正在跟踪我的 iOS 应用程序的内存泄漏,我有一个奇怪的泄漏导致我的应用程序崩溃......负责的框架是:CGImageMergeXMPPropsWhithLegacyProps。在某些时候,我的应
我正在尝试使用 NSOperationQueue 在后台线程中执行一个方法,如下所示: NSOperationQueue *queue = [NSOperationQueue new]; NS
我是一名优秀的程序员,十分优秀!