gpt4 book ai didi

memory - 这是 Spark 流的错误还是内存泄漏?

转载 作者:IT王子 更新时间:2023-10-28 23:36:14 25 4
gpt4 key购买 nike

我将代码提交到 spark 独立集群。提交命令如下:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2" \
./myCode.py 1>a.log 2>b.log &

我在上面的命令中指定执行器使用 4G 内存。但是使用 top 命令监控 executor 进程,我注意到内存使用量一直在增长。现在顶部的命令输出如下:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root 20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java

我的总内存是 16G,所以 37.3% 已经比我指定的 4GB 大了。而且还在增长。

使用ps命令,可以知道是executor进程。

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ? Sl 1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ? Sl 6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ? Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ? Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://Worker@10.79.148.184:52660

下面是代码。这很简单,所以我认为没有内存泄漏

if __name__ == "__main__":

dataDirectory = '/stream/raw'

sc = SparkContext(appName="Netflow")
ssc = StreamingContext(sc, 20)

# Read CSV File
lines = ssc.textFileStream(dataDirectory)

lines.foreachRDD(process)

ssc.start()
ssc.awaitTermination()

处理函数的代码如下。请注意,我在这里使用的是 HiveContext 而不是 SqlContext。因为SqlContext不支持窗口功能

def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(time, rdd):

if rdd.isEmpty():
return sc.emptyRDD()

sqlContext = getSqlContextInstance(rdd.context)

# Convert CSV File to Dataframe
parts = rdd.map(lambda l: l.split(","))
rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
dataframe = sqlContext.createDataFrame(rowRdd)

# Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")

ret.show()
dataframe.show()

其实我发现下面的代码会导致问题:

    # Get the top 2 interface of each router
dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
ret.show()

因为如果我删除这 5 行。该代码可以运行一整夜而不会显示内存增加。但是添加它们会导致executor的内存使用量增长到非常高。

基本上,上面的代码只是 SparkSQL 中的一些窗口 + grouby。那么这是一个错误吗?

最佳答案

Disclaimer: this answer isn't based on debugging, but more on observations and the documentation Apache Spark provides

我不认为这是一个错误!

查看您的配置,我们可以看到您主要关注执行器调优,这并没有错,但您忘记了等式中的驱动程序部分。

查看 Apache Spark documentaion 中的 spark 集群概述

enter image description here

如您所见,每个工作程序都有一个执行程序,但是,在您的情况下,工作程序节点与驱动程序节点相同!坦率地说,当您在本地运行或在单个节点的独立集群上运行时就是这种情况。

此外,驱动程序默认占用 1G 内存,除非使用 spark.driver.memory 标志进行调整。此外,您不应该忘记 JVM 本身的堆使用情况,以及由驱动程序处理的 Web UI AFAIK!

当您删除您提到的代码行时,您的代码将没有 actions 因为 map 函数只是一个转换,因此不会执行,并且因此,您根本看不到内存增加!

同样适用于 groupBy 因为它只是一个不会执行的转换,除非正在调用一个 Action ,在你的情况下是 aggshow 更进一步!

也就是说,如果您想控制此进程的核心数量,请尽量减少驱动程序内存和 spark 中由 spark.cores.max 定义的核心总数,然后级联到执行者。此外,我会添加 spark.python.profile.dump到您的配置列表中,以便您可以查看 Spark 作业执行的配置文件,这可以帮助您更多地了解案例,并根据您的需要调整集群。

关于memory - 这是 Spark 流的错误还是内存泄漏?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37152501/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com