gpt4 book ai didi

python - 覆盖数据帧变量时引发内存泄漏

转载 作者:太空宇宙 更新时间:2023-11-04 02:49:56 33 4
gpt4 key购买 nike

我在 spark 驱动程序中遇到了这个内存泄漏,我似乎无法弄清楚原因。我的猜测是它与尝试覆盖 DataFrame 变量有关,但我找不到任何文档或其他类似问题。

这是在 Spark 2.1.0 (PySpark) 上。

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *

spark = SparkSession \
.builder \
.appName("Spark Leak") \
.getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext.getOrCreate(sc)

items = 5000000
data = [str(x) for x in range(1,items)]

df = sqlContext.createDataFrame(data, StringType())
print(df.count())

for x in range(0,items):
sub_df = sqlContext.createDataFrame([str(x)], StringType())
df = df.subtract(sub_df)

print(df.count())

这将继续运行,直到驱动程序内存不足然后死掉。

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:917)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1089)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1081)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1081)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1184)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1717)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/05/25 16:55:40 ERROR DAGScheduler: Failed to update accumulators for task 13
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:915)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1089)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1081)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1081)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1184)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1717)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
...

如果有的话,我认为内存应该减少,因为项目正在从 DataFrame 中删除,但事实并非如此。

我是不是不明白 spark 如何将 DataFrames 分配给 Python 变量之类的?

我还尝试将 df.subtract 分配给一个新的临时变量,然后取消持久化 df 然后将临时变量分配给 df 并取消保留临时变量,但这也有同样的问题。

最佳答案

这里的根本问题似乎是理解 DataFrame 到底是什么(这也适用于 Spark RDD)。当对给定对象执行某些操作时,本地 DataFrame 对象有效地描述了要执行的计算。

因此它是一个递归结构,它捕获了它的所有依赖项。每次迭代的有效执行计划。虽然 Spark 提供了工具,例如 checkpointing,可用于解决此问题并切断沿袭,但有问题的代码首先没有多大意义。

Spark 中可用的分布式数据结构专为高延迟、IO 密集型作业而设计。并行化单个对象、在数百万个分布式对象上执行数百万个 Spark 作业都无法正常工作。

此外,Spark 并不是为高效的单项操作而设计的。每个 subtract 都是 O(N) 使整个过程 O(N2),并且在任何情况下都没有用大数据集。

虽然本身很简单,但正确的方法应该是这样的:

items = 5000000

df1 = spark.range(items).selectExpr("cast(id as string)")
df2 = spark.range(items).selectExpr("cast(id as string)")
df1.subtract(df2)

关于python - 覆盖数据帧变量时引发内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44186114/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com