gpt4 book ai didi

python - PySpark:Numpy 内存未在执行程序映射分区函数中释放(内存泄漏)

转载 作者:太空狗 更新时间:2023-10-30 00:15:49 25 4
gpt4 key购买 nike

我有以下最小工作示例:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np

sc = SparkContext()
sqlContext = SQLContext(sc)

# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)

def toy_example(rdd):

# Read in pySpark DataFrame partition
data = list(rdd)

# Generate random data using Numpy
rand_data = np.random.random(int(1e7))

# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)

# Return a single `0` value
return [[0]]

# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()

当上面的代码运行时,执行者的 Python 进程的内存在每次迭代后稳定增加,这表明前一次迭代的内存没有被释放——即内存泄漏。如果内存超过执行程序的内存限制,这可能会导致作业失败 - 见下文:

enter image description here

奇怪的是,以下任何一项都可以防止内存泄漏:

  • 删除行 data = list(rdd)
  • rand_data = np.random.random(int(1e7)) 之后插入行 rand_data = list(rand_data.tolist())
  • 删除行 int(e)

以上代码是无法使用上述修复的更大项目的最小工作示例。

注意事项:

  • 虽然函数中未使用 rdd 数据,但需要该行来重现泄漏。在实际项目中,使用了 rdd 数据。
  • 内存泄漏可能是由于未释放大型 Numpy 数组 rand_data
  • 您必须对 rand_data 的每个元素执行 int 操作以重现泄漏 🤷

问题

能否通过在toy_example函数的前几行或后几行插入代码,强制PySpark执行器释放rand_data的内存?

已经尝试过的

通过在函数末尾插入来强制垃圾回收:

del data, rand_data
import gc
gc.collect()

通过在函数的末尾或开头插入来强制释放内存(受 Pandas issue 启发):

from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)

设置、测量和版本

以下 PySpark 作业在具有一个 m4.xlarge 工作节点的 AWS EMR 集群上运行。 Numpy 必须通过 pip 安装在工作节点上 bootstrapping .

使用以下函数测量执行程序的内存(打印到执行程序的日志):

import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

Spark 提交配置:

  • spark.executor.instances = 1
  • spark.executor.cores = 1
  • spark.executor.memory = 6g
  • spark.master = yarn
  • spark.dynamicAllocation.enabled = false

版本:

  • 电子病历 5.12.1
  • 星火 2.2.1
  • python 2.7.13
  • NumPy 1.14.0

最佳答案

我们最近遇到了一个非常相似的问题,我们也无法通过更改代码来强制释放内存。然而,对我们有用的是使用以下 Spark 选项:spark.python.worker.reuse = False

关于python - PySpark:Numpy 内存未在执行程序映射分区函数中释放(内存泄漏),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53105508/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com