- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
合并多个 dask 数据帧会使我的计算机崩溃。
你好,
我正在尝试将一长串 csv 文件与 dask 合并。每个 csv 文件都包含变量更改其值时的时间戳列表,以及值;例如对于 variable1 我们有:
timestamp; value
2016-01-01T00:00:00; 3
2016-01-03T00:00:00; 4
而对于变量 2,我们有:
timestamp; value
2016-01-02T00:00:00; 8
2016-01-04T00:00:00; 9
每个 csv 中的时间戳可能不同(因为它们与变量值更改的时刻相关联)。作为最终结果,我想获得一个 hdf 文件,其中每个变量在每个发生的时间戳中都有值,前向填充。因此,类似于以下内容:
timestamp; var1; var2,
2016-01-01T00:00:00; 3 ; nan
2016-01-02T00:00:00; 3 ; 8
2016-01-03T00:00:00; 4 ; 8
2016-01-04T00:00:00; 4 ; 9
下面,我提供了用于实现此解析和合并的元代码。
# import
from pathlib import Path
from functools import partial
import import dask.dataframe as dd
import dask.bag as db
from dask import delayed
from dask.diagnostics import ProgressBar
# define how to parse the dates
def parse_dates(df):
return pd.to_datetime(df['timestamp'], format='%Y-%m-%dT%H:%M:%S', errors='coerce')
# parse csv files to dask dataframe
def parse_csv2filtered_ddf(fn_file, sourcedir):
fn = source_dir.joinpath(fn_tag)
ddf = dd.read_csv(fn, sep=';', usecols=['timestamp', 'value'],
blocksize=10000000, dtype={'value': 'object'})
meta = ('timestamp', 'datetime64[ns]')
ddf['timestamp'] = ddf.map_partitions(parse_dates, meta=meta)
v = fn_file.split('.csv')[0]
ddf = ddf.dropna() \
.rename(columns={'value': v}) \
.set_index('timestamp')
return ddf
# define how to merge
def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf
# set source directory
source_dir = Path('/path_to_list_of_csv_files/')
# get list of files to parse
lcsv = os.listdir(source_dir)
# make partial function to fix sourcedir
parse_csv2filtered_ddf_partial = partial(parse_csv2filtered_ddf, source_dir)
# make bag of dataframes
b = db.from_sequence(lcsv).map(parse_csv2filtered_ddf_partial)
# merge all dataframes and reduce to 1 dataframe
df = b.fold(binop=merge_ddf)
# forward fill the NaNs and drop the remaining
#
# please note that I am choosing here npartitions equal to 48 as
# experiments with smaller sets of data allow me to estimate
# the output size of the df which should be around 48 GB, hence
# chosing 48 should lead to partition of 1 GB, I guess.
df = delayed(df).repartition(npartitions=48). \
fillna(method='ffill'). \
dropna()
# write output to hdf file
df = df.to_hdf(output_fn, '/data')
# start computation
with ProgressBar():
df.compute(scheduler='threads')
不幸的是,这个脚本永远不会成功运行。特别是监控内存使用情况,我可以跟随内存完全流动起来,之后要么是电脑崩溃,要么是程序崩溃。
我试过只使用一个线程,结合多个进程;例如
import dask
dask.config.set(scheduler='single-threaded')
结合
with ProgressBar():
df.compute(scheduler='processes', num_workers=3)
同样没有成功。
热烈欢迎任何指向正确方向的指示。
编辑
下面,我提供了一个更简洁的脚本,它应该允许生成类似的数据来重现 MemoryError。
import numpy as np
import pandas as pd
from dask import delayed
from dask import dataframe as dd
from dask import array as da
from dask import bag as db
from dask.diagnostics import ProgressBar
from datetime import datetime
from datetime import timedelta
from functools import partial
def make_ddf(col, values, timestamps):
n = int(col) % 2
idx_timestamps = timestamps[n::2]
df = pd.DataFrame.from_dict({str(col): values, 'timestamp': idx_time})
ddf = dd.from_pandas(df, chunksize=100000000)
ddf = ddf.dropna() \
.set_index('timestamp')
return ddf
def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf
N_DF_TO_MERGE = 55 # number of dataframes to merge
N_PARTITIONS_REPARTITION = 55
values = np.random.randn(5000000, 1).flatten()
timestamps = [datetime.now() + timedelta(seconds=i*1) for i in range(10000000)]
columns = list(range(N_DF_TO_MERGE))
# fix values and times
make_ddf_partial = partial(make_ddf, values=values, timestamps=timestamps)
# make bag
b = db.from_sequence(columns).map(make_ddf_partial)
# merge all dataframes and reduce to one
df = b.fold(binop=merge_ddf)
# forward fill the NaNs and drop the remaining
df = delayed(df).repartition(npartitions=N_PARTITIONS_REPARTITION). \
fillna(method='ffill'). \
dropna()
# write output to hdf file
df = df.to_hdf('magweg.hdf', '/data')
with ProgressBar():
df.compute(scheduler='threads')
这会导致以下错误:
Traceback (most recent call last): File "mcve.py", line 63, in main() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 764, in call return self.main(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 717, in main rv = self.invoke(ctx) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 956, in invoke return ctx.invoke(self.callback, **ctx.params) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 555, in invoke return callback(*args, **kwargs) File "mcve.py", line 59, in main df.compute(scheduler='threads') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 156, in compute (result,) = compute(self, traverse=False, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 398, in compute results = schedule(dsk, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\utils.py", line 697, in call return getattr(obj, self.method)(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\core.py", line 1154, in to_hdf return to_hdf(self, path_or_buf, key, mode, append, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\io\hdf.py", line 227, in to_hdf scheduler=scheduler, **dask_kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 166, in compute_as_if_collection return schedule(dsk2, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\methods.py", line 103, in boundary_slice result = getattr(df, kind)[start:stop] File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1500, in getitem return self._getitem_axis(maybe_callable, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1867, in _getitem_axis return self._get_slice_axis(key, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1536, in _get_slice_axis return self._slice(indexer, axis=axis, kind='iloc') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 151, in _slice return self.obj._slice(obj, axis=axis, kind=kind) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\generic.py", line 3152, in _slice result = self._constructor(self._data.get_slice(slobj, axis=axis)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 700, in get_slice bm._consolidate_inplace() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 929, in _consolidate_inplace self.blocks = tuple(_consolidate(self.blocks)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 1899, in _consolidate _can_consolidate=_can_consolidate) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\blocks.py", line 3146, in _merge_blocks new_values = np.vstack([b.values for b in blocks]) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\numpy\core\shape_base.py", line 234, in vstack return _nx.concatenate([atleast_2d(_m) for _m in tup], 0) MemoryError
最佳答案
有两件事看起来很奇怪。
关于python - 如何在不耗尽内存的情况下将数据帧与 dask 合并?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54594803/
初学者 android 问题。好的,我已经成功写入文件。例如。 //获取文件名 String filename = getResources().getString(R.string.filename
我已经将相同的图像保存到/data/data/mypackage/img/中,现在我想显示这个全屏,我曾尝试使用 ACTION_VIEW 来显示 android 标准程序,但它不是从/data/dat
我正在使用Xcode 9,Swift 4。 我正在尝试使用以下代码从URL在ImageView中显示图像: func getImageFromUrl(sourceUrl: String) -> UII
我的 Ubuntu 安装 genymotion 有问题。主要是我无法调试我的数据库,因为通过 eclipse 中的 DBMS 和 shell 中的 adb 我无法查看/data/文件夹的内容。没有显示
我正在尝试用 PHP 发布一些 JSON 数据。但是出了点问题。 这是我的 html -- {% for x in sets %}
我观察到两种方法的结果不同。为什么是这样?我知道 lm 上发生了什么,但无法弄清楚 tslm 上发生了什么。 > library(forecast) > set.seed(2) > tts lm(t
我不确定为什么会这样!我有一个由 spring data elasticsearch 和 spring data jpa 使用的类,但是当我尝试运行我的应用程序时出现错误。 Error creatin
在 this vega 图表,如果我下载并转换 flare-dependencies.json使用以下 jq 到 csv命令, jq -r '(map(keys) | add | unique) as
我正在提交一个项目,我必须在其中创建一个带有表的 mysql 数据库。一切都在我这边进行,所以我只想检查如何将我所有的压缩文件发送给使用不同计算机的人。基本上,我如何为另一台计算机创建我的数据库文件,
我有一个应用程序可以将文本文件写入内部存储。我想仔细看看我的电脑。 我运行了 Toast.makeText 来显示路径,它说:/数据/数据/我的包 但是当我转到 Android Studio 的 An
我喜欢使用 Genymotion 模拟器以如此出色的速度加载 Android。它有非常好的速度,但仍然有一些不稳定的性能。 如何从 Eclipse 中的文件资源管理器访问 Genymotion 模拟器
我需要更改 Silverlight 中文本框的格式。数据通过 MVVM 绑定(bind)。 例如,有一个 int 属性,我将 1 添加到 setter 中的值并调用 OnPropertyChanged
我想向 Youtube Data API 提出请求,但我不需要访问任何用户信息。我只想浏览公共(public)视频并根据搜索词显示视频。 我可以在未经授权的情况下这样做吗? 最佳答案 YouTube
我已经设置了一个 Twilio 应用程序,我想向人们发送更新,但我不想回复单个文本。我只是想让他们在有问题时打电话。我一切正常,但我想在发送文本时显示传入文本,以确保我不会错过任何问题。我正在使用 p
我有一个带有表单的网站(目前它是纯 HTML,但我们正在切换到 JQuery)。流程是这样的: 接受用户的输入 --- 5 个整数 通过 REST 调用网络服务 在服务器端运行一些计算...并生成一个
假设我们有一个名为 configuration.js 的文件,当我们查看内部时,我们会看到: 'use strict'; var profile = { "project": "%Projec
这部分是对 Previous Question 的扩展我的: 我现在可以从我的 CI Controller 成功返回 JSON 数据,它返回: {"results":[{"id":"1","Sourc
有什么有效的方法可以删除 ios 中 CBL 的所有文档存储?我对此有疑问,或者,如果有人知道如何从本质上使该应用程序像刚刚安装一样,那也会非常有帮助。我们正在努力确保我们的注销实际上将应用程序设置为
我有一个 Rails 应用程序,它与其他 Rails 应用程序通信以进行数据插入。我使用 jQuery $.post 方法进行数据插入。对于插入,我的其他 Rails 应用程序显示 200 OK。但在
我正在为服务于发布请求的 API 调用运行单元测试。我正在传递请求正文,并且必须将响应作为帐户数据返回。但我只收到断言错误 注意:数据是从 Azure 中获取的 spec.js const accou
我是一名优秀的程序员,十分优秀!