gpt4 book ai didi

python - 在 EC2 上运行大数据计算时出现 dask.async.MemoryError

转载 作者:太空宇宙 更新时间:2023-11-03 16:21:04 25 4
gpt4 key购买 nike

我有一个 m4.4xlarge(64 GB 内存)EC2 盒子。我正在与 Pandas 一起运行 dask。我收到以下内存错误。

我在运行大约 24 小时后收到此消息,这大约是完成任务所需的时间,因此我不确定错误是否是由于 RAM 不足、磁盘内存不足导致的执行 DF.to_csv() 将大 DF 写入磁盘或 pandas/numpy 内部内存限制?

raise(remote_exception(res, tb))
dask.async.MemoryError:

Traceback
---------
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 248, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4061, in apply
return self._apply_standard(f, axis, reduce=reduce)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/frame.py", line 4179, in _apply_standard
result = result._convert(datetime=True, timedelta=True, copy=False)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3004, in _convert
copy=copy)).__finalize__(self)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2941, in convert
return self.apply('convert', **kwargs)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 2901, in apply
bm._consolidate_inplace()
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3278, in _consolidate_inplace
self.blocks = tuple(_consolidate(self.blocks))
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4269, in _consolidate
_can_consolidate=_can_consolidate)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4289, in _merge_blocks
new_values = _vstack([b.values for b in blocks], dtype)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 4335, in _vstack
return np.vstack(to_stack)
File "/home/ec2-user/anaconda2/lib/python2.7/site-packages/numpy/core/shape_base.py", line 230, in vstack
return _nx.concatenate([atleast_2d(_m) for _m in tup], 0)

更新:

根据 MRocklin 的回答,我们提供了一些附加信息。

这是我执行该过程的方式:

def dask_stats_calc(dfpath,v1,v2,v3...):
dfpath_ddf = dd.from_pandas(dfpath,npartitions=16,sort=False)
return dfpath_ddf.apply(calculate_stats,axis=1,args=(dfdaily,v1,v2,v3...)).compute(get=get).stack().reset_index(drop=True)

f_threaded = partial(dask_stats_calc,dfpath,v1,v2,v3...,multiprocessing.get)
f_threaded()

现在,dfpath 是一个包含 140 万行的 df,因此 dfpath_ddf.apply() 运行了超过 140 万行。

一旦整个dfpath_ddf.apply()完成,就会发生df.to_csv(),但正如您所说,最好定期写入磁盘。

现在的问题是,如何实现每隔 200k 行定期写入磁盘之类的功能?我想我可以将 dfpath_ddf 分成 200k block (或类似的东西)并按顺序运行每个 block ?

最佳答案

单线程执行

有时,任务会在等待写入磁盘上的单个文件时在 RAM 中构建。对于并行系统来说,使用这样的顺序输出本质上是很棘手的。如果您需要使用单个文件,那么我建议尝试相同的单线程计算,看看是否有区别。

with dask.set_options(get=dask.async.get_sync):
DF.to_csv('out.csv')

写入多个文件

或者(也是首选)您可以尝试写入许多 CSV 文件。这在调度上要容易得多,因为任务不必等到其前任任务完成才能写入磁盘并从 RAM 中删除自己。

DF.to_csv('out.*.csv')

示例

因此,并行执行和写入的一种常见且相当稳健的方法是将计算与最后对 to_csv 的调用结合起来

ddf = dd.from_pandas(df, npartitions=100)
ddf.apply(myfunc).to_csv('out.*.csv')

这会将数据帧分解为 block ,在每个 block 上调用函数,将该 block 写入磁盘,然后删除中间值,释放空间。

关于python - 在 EC2 上运行大数据计算时出现 dask.async.MemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38444777/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com