gpt4 book ai didi

python - 如何在不耗尽内存的情况下将数据帧与 dask 合并?

转载 作者:行者123 更新时间:2023-12-01 15:05:52 25 4
gpt4 key购买 nike

合并多个 dask 数据帧会使我的计算机崩溃。

你好,

我正在尝试将一长串 csv 文件与 dask 合并。每个 csv 文件都包含变量更改其值时的时间戳列表,以及值;例如对于 variable1 我们有:

timestamp; value
2016-01-01T00:00:00; 3
2016-01-03T00:00:00; 4

而对于变量 2,我们有:

timestamp; value
2016-01-02T00:00:00; 8
2016-01-04T00:00:00; 9

每个 csv 中的时间戳可能不同(因为它们与变量值更改的时刻相关联)。作为最终结果,我想获得一个 hdf 文件,其中每个变量在每个发生的时间戳中都有值,前向填充。因此,类似于以下内容:

timestamp; var1; var2, 
2016-01-01T00:00:00; 3 ; nan
2016-01-02T00:00:00; 3 ; 8
2016-01-03T00:00:00; 4 ; 8
2016-01-04T00:00:00; 4 ; 9

下面,我提供了用于实现此解析和合并的元代码。

# import 
from pathlib import Path
from functools import partial
import import dask.dataframe as dd
import dask.bag as db
from dask import delayed
from dask.diagnostics import ProgressBar

# define how to parse the dates
def parse_dates(df):
return pd.to_datetime(df['timestamp'], format='%Y-%m-%dT%H:%M:%S', errors='coerce')

# parse csv files to dask dataframe
def parse_csv2filtered_ddf(fn_file, sourcedir):
fn = source_dir.joinpath(fn_tag)
ddf = dd.read_csv(fn, sep=';', usecols=['timestamp', 'value'],
blocksize=10000000, dtype={'value': 'object'})
meta = ('timestamp', 'datetime64[ns]')
ddf['timestamp'] = ddf.map_partitions(parse_dates, meta=meta)
v = fn_file.split('.csv')[0]
ddf = ddf.dropna() \
.rename(columns={'value': v}) \
.set_index('timestamp')
return ddf

# define how to merge
def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf

# set source directory
source_dir = Path('/path_to_list_of_csv_files/')

# get list of files to parse
lcsv = os.listdir(source_dir)

# make partial function to fix sourcedir
parse_csv2filtered_ddf_partial = partial(parse_csv2filtered_ddf, source_dir)

# make bag of dataframes
b = db.from_sequence(lcsv).map(parse_csv2filtered_ddf_partial)

# merge all dataframes and reduce to 1 dataframe
df = b.fold(binop=merge_ddf)

# forward fill the NaNs and drop the remaining
#
# please note that I am choosing here npartitions equal to 48 as
# experiments with smaller sets of data allow me to estimate
# the output size of the df which should be around 48 GB, hence
# chosing 48 should lead to partition of 1 GB, I guess.
df = delayed(df).repartition(npartitions=48). \
fillna(method='ffill'). \
dropna()

# write output to hdf file
df = df.to_hdf(output_fn, '/data')

# start computation
with ProgressBar():
df.compute(scheduler='threads')

不幸的是,这个脚本永远不会成功运行。特别是监控内存使用情况,我可以跟随内存完全流动起来,之后要么是电脑崩溃,要么是程序崩溃。

我试过只使用一个线程,结合多个进程;例如

import dask
dask.config.set(scheduler='single-threaded')

结合

with ProgressBar():
df.compute(scheduler='processes', num_workers=3)

同样没有成功。

热烈欢迎任何指向正确方向的指示。

编辑

下面,我提供了一个更简洁的脚本,它应该允许生成类似的数据来重现 MemoryError。

import numpy as np
import pandas as pd
from dask import delayed
from dask import dataframe as dd
from dask import array as da
from dask import bag as db
from dask.diagnostics import ProgressBar
from datetime import datetime
from datetime import timedelta
from functools import partial

def make_ddf(col, values, timestamps):
n = int(col) % 2
idx_timestamps = timestamps[n::2]
df = pd.DataFrame.from_dict({str(col): values, 'timestamp': idx_time})
ddf = dd.from_pandas(df, chunksize=100000000)
ddf = ddf.dropna() \
.set_index('timestamp')
return ddf

def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf

N_DF_TO_MERGE = 55 # number of dataframes to merge
N_PARTITIONS_REPARTITION = 55

values = np.random.randn(5000000, 1).flatten()
timestamps = [datetime.now() + timedelta(seconds=i*1) for i in range(10000000)]
columns = list(range(N_DF_TO_MERGE))

# fix values and times
make_ddf_partial = partial(make_ddf, values=values, timestamps=timestamps)

# make bag
b = db.from_sequence(columns).map(make_ddf_partial)

# merge all dataframes and reduce to one
df = b.fold(binop=merge_ddf)

# forward fill the NaNs and drop the remaining
df = delayed(df).repartition(npartitions=N_PARTITIONS_REPARTITION). \
fillna(method='ffill'). \
dropna()

# write output to hdf file
df = df.to_hdf('magweg.hdf', '/data')

with ProgressBar():
df.compute(scheduler='threads')

这会导致以下错误:

Traceback (most recent call last): File "mcve.py", line 63, in main() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 764, in call return self.main(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 717, in main rv = self.invoke(ctx) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 956, in invoke return ctx.invoke(self.callback, **ctx.params) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 555, in invoke return callback(*args, **kwargs) File "mcve.py", line 59, in main df.compute(scheduler='threads') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 156, in compute (result,) = compute(self, traverse=False, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 398, in compute results = schedule(dsk, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\utils.py", line 697, in call return getattr(obj, self.method)(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\core.py", line 1154, in to_hdf return to_hdf(self, path_or_buf, key, mode, append, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\io\hdf.py", line 227, in to_hdf scheduler=scheduler, **dask_kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 166, in compute_as_if_collection return schedule(dsk2, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\methods.py", line 103, in boundary_slice result = getattr(df, kind)[start:stop] File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1500, in getitem return self._getitem_axis(maybe_callable, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1867, in _getitem_axis return self._get_slice_axis(key, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1536, in _get_slice_axis return self._slice(indexer, axis=axis, kind='iloc') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 151, in _slice return self.obj._slice(obj, axis=axis, kind=kind) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\generic.py", line 3152, in _slice result = self._constructor(self._data.get_slice(slobj, axis=axis)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 700, in get_slice bm._consolidate_inplace() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 929, in _consolidate_inplace self.blocks = tuple(_consolidate(self.blocks)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 1899, in _consolidate _can_consolidate=_can_consolidate) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\blocks.py", line 3146, in _merge_blocks new_values = np.vstack([b.values for b in blocks]) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\numpy\core\shape_base.py", line 234, in vstack return _nx.concatenate([atleast_2d(_m) for _m in tup], 0) MemoryError

最佳答案

有两件事看起来很奇怪。

  1. 您正在从 dask.bag 代码中调用 dask 数据帧代码。
  2. 当您可能只想连接时,您似乎在调用合并?

关于python - 如何在不耗尽内存的情况下将数据帧与 dask 合并?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54594803/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com