gpt4 book ai didi

python - 为什么来自 s3 的 dask read_csv 保留了这么多内存?

转载 作者:太空狗 更新时间:2023-10-29 17:20:40 25 4
gpt4 key购买 nike

我正在使用 dask(SQL 查询的替代品)从 s3 读取一些压缩数据。但是,看起来有一些数据文件的缓存,或者在系统内存中某处保存的解压缩文件。注意,这应该是可运行的,这里的测试数据来自公共(public) s3 存储桶中的 pandas 测试套件。

import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os

#for easier vis
mb = 1048576

def mytestfunc(file):
process = ps.Process(os.getpid())

print('initial memory: {0}'.format(process.memory_info().rss/mb))
data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = {'anon':True})

print('dask plan memory: {0}'.format(process.memory_info().rss/mb))

data = data.compute()
print('data in memory: {0}'.format(process.memory_info().rss/mb))
print('data frame usage: {0}'.format(data.memory_usage(deep=True).sum()/mb))
return data

process = ps.Process(os.getpid())
print('before function call: {0}'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: {0}'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function call: {0}'.format(process.memory_info().rss/mb))

这给了我:

before function call: 76.984375
initial memory: 76.984375
dask plan memory: 92.9921875
data in memory: 224.71484375
data frame usage: 38.14704895019531
After function call: 224.7265625

天真地,我希望“函数调用后”是“函数调用前”加上数据帧和一些开销。在这里,gzip 是 43mb,导致大约 90mb 的开销,在我的真实示例中,这个额外的部分是 10gb 数据帧的大约 50gb 额外内存。

如果您在另一个较小的文件上重新运行,您可以看到内存已释放 - 取消对较小文件重新运行的注释以查看它。这也表明增加是由文件大小引起的 - 您可以切换顺序并先运行“提示”,内存保持在 ~90mb。

我猜 dask、s3fs 或 pandas 正在将文件或解压缩的内容保存在某个缓冲区中,但我无法找到它来清除它。

关于如何减少内存使用或释放​​缓冲区的任何想法?

编辑:我的一些真实数据的上述输出示例 - 32 个 gzip 文件:

before function call: 70.69921875
initial memory: 70.69921875
dask plan memory: 80.16015625
data in memory: 33991.69921875
data frame usage: 10824.553115844727
After function call: 33991.69921875

我知道 dask 在相同的 32 个文件上比 pandas 循环有更高的内存使用峰值,但我仍然不明白为什么它没有被释放。

最佳答案

在线程中使用 pandas.read_csv 时,Python 进程似乎泄漏了一点内存。我已将其简化为 pandas.read_csvconcurrent.futures.ThreadPoolExecutor 的问题。这是在 Pandas 问题跟踪器上提出的:https://github.com/pandas-dev/pandas/issues/19941

# imports
import pandas as pd
import numpy as np
import time
import psutil
from concurrent.futures import ThreadPoolExecutor

# prep
process = psutil.Process()
e = ThreadPoolExecutor(8)

# prepare csv file, only need to run once
pd.DataFrame(np.random.random((100000, 50))).to_csv('large_random.csv')


# baseline computation making pandas dataframes with threasds. This works fine

def f(_):
return pd.DataFrame(np.random.random((1000000, 50)))

print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(f, range(8)))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')

# before: 57.0 MB
# after: 56.0 MB

# example with read_csv, this leaks memory
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(pd.read_csv, ['large_random.csv'] * 8))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')

# before: 58.0 MB
# after: 323.0 MB

关于python - 为什么来自 s3 的 dask read_csv 保留了这么多内存?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48954080/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com