gpt4 book ai didi

Dask - 删除重复索引 MemoryError

转载 作者:行者123 更新时间:2023-12-01 05:21:48 25 4
gpt4 key购买 nike

当我尝试使用以下代码在大型数据帧上删除重复的时间戳时,出现了 MemoryError

import dask.dataframe as dd

path = f's3://{container_name}/*'
ddf = dd.read_parquet(path, storage_options=opts, engine='fastparquet')
ddf = ddf.reset_index().drop_duplicates(subset='timestamp_utc').set_index('timestamp_utc')
...

分析表明,它在包含约 4000 万行数据的 265MB 压缩 Parquet 文件的数据集上使用了约 14GB 的 RAM。

有没有其他方法可以在 Dask 不使用太多内存的情况下删除数据上的重复索引?

下面的回溯

Traceback (most recent call last):
File "/anaconda/envs/surb/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/anaconda/envs/surb/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/chengkai/surbana_lift/src/consolidate_data.py", line 62, in <module>
consolidate_data()
File "/home/chengkai/surbana_lift/src/consolidate_data.py", line 37, in consolidate_data
ddf = ddf.reset_index().drop_duplicates(subset='timestamp_utc').set_index('timestamp_utc')
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/dataframe/core.py", line 2524, in set_index
divisions=divisions, **kwargs)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/dataframe/shuffle.py", line 64, in set_index
divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/base.py", line 407, in compute
results = get(dsk, keys, **kwargs)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/threaded.py", line 75, in get
pack_exception=pack_exception, **kwargs)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 521, in get_async
raise_exception(exc, tb)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/compatibility.py", line 67, in reraise
raise exc
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 290, in execute_task
result = _execute_task(task, data)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 270, in _execute_task
args2 = [_execute_task(a, cache) for a in args]
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 270, in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 267, in _execute_task
return [_execute_task(a, cache) for a in arg]
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 267, in <listcomp>
return [_execute_task(a, cache) for a in arg]
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/local.py", line 271, in _execute_task
return func(*args2)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/dataframe/core.py", line 69, in _concat
return args[0] if not args2 else methods.concat(args2, uniform=True)
File "/anaconda/envs/surb/lib/python3.6/site-packages/dask/dataframe/methods.py", line 329, in concat
out = pd.concat(dfs3, join=join)
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/reshape/concat.py", line 226, in concat
return op.get_result()
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/reshape/concat.py", line 423, in get_result
copy=self.copy)
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/internals.py", line 5418, in concatenate_block_manage
rs
[ju.block for ju in join_units], placement=placement)
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/internals.py", line 2984, in concat_same_type
axis=self.ndim - 1)
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/dtypes/concat.py", line 461, in _concat_datetime
return _concat_datetimetz(to_concat)
File "/anaconda/envs/surb/lib/python3.6/site-packages/pandas/core/dtypes/concat.py", line 506, in _concat_datetimetz
new_values = np.concatenate([x.asi8 for x in to_concat])
MemoryError

最佳答案

数据在内存中变得非常大也就不足为奇了。就空间而言,Parquet 是一种非常有效的格式,尤其是使用 gzip 压缩时,字符串都变成了 python 对象(内存占用如此之大)。

此外,您还有许多工作线程在整个数据帧的某些部分上运行。这涉及数据复制、中间和结果的连接;后者在 pandas 中效率很低。

一个建议:您可以通过为 read_parquet 指定 index=False 来删除一个步骤,而不是 reset_index

下一个建议:将您使用的线程数限制为小于默认值,这可能是您的 CPU 核心数。最简单的方法是在进程中使用分布式客户端

from dask.distributed import Client
c = Client(processes=False, threads_per_worker=4)

最好先设置索引,然后使用 map_partitions 执行 drop_duplicated 以最小化跨分区通信。

df.map_partitions(lambda d: d.drop_duplicates(subset='timestamp_utc'))

关于Dask - 删除重复索引 MemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51301034/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com