gpt4 book ai didi

python - 使用 Pandas 提高大型 HDFStore 表的查询性能

转载 作者:太空狗 更新时间:2023-10-30 01:29:04 26 4
gpt4 key购买 nike

我有一个大型(约 1.6 亿行)数据框,我已将其存储到磁盘中,如下所示:

    def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

该表有一个时间索引,除了时间(通过索引)之外,我还将使用 c_idf_id 进行查询。

我有另一个包含约 18000 个“事件”的数据框。每个事件都包含一些(少则数百,多则数十万)条记录。我需要为每个事件收集一些简单的统计数据并将它们存储起来,以便收集一些汇总统计数据。目前我这样做是这样的:

def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents

除了每个 store.select() 语句大约需要 5 秒之外,这一切都很好,这意味着处理整个月的数据需要 24-30 小时的处理时间。同时,我需要的实际统计数据相对简单:

def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

我的问题是:我怎样才能提高这个工作流程的任何部分的性能或效率?(请注意,我实际上对大部分作业进行批处理,以获取和存储事件,一天一次时间只是因为我想限制在发生崩溃时丢失已处理数据的风险。为了简单起见,我将这段代码留在此处,因为我实际上需要处理整个月的数据。)

有没有一种方法可以在我从商店收到数据时对其进行处理,这样做有什么好处吗?我会从使用 store.select_as_index 中受益吗?如果我收到一个索引,我仍然需要访问数据以获得正确的统计数据吗?

其他注意事项/问题:我比较了将 HDFStore 存储在 SSD 和普通硬盘驱动器上的性能,并没有发现 SSD 有任何改进。这是预期的吗?

我还考虑过创建大量查询字符串并同时请求它们的想法。当总查询字符串太大(~5-10 个查询)时,这会导致内存错误。

编辑 1 如果重要的话,我使用的是表版本 3.1.0 和 pandas 版本 0.13.1

编辑 2 这里有更多信息:

ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

以下是主表和 inc_times 的示例:

In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449

s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000

[5 rows x 4 columns]

关于c_id和f_id,我想从全store中选取的id集合相对于store中的id总数来说是比较少的。也就是说,inc_times中有一些流行的ID,我会反复查询,而完全忽略一些全表存在的ID。我估计我关心的 ID 大约占 ID 总数的 10%,但这些是最流行的 ID,因此它们的记录在整个 ID 中占主导地位。

我有 16GB 内存。完整存储为 7.4G,完整数据集(作为 csv 文件)仅为 8.7GB。最初我相信我能够将整个东西加载到内存中并且至少对其进行一些有限的操作,但是我在加载整个东西时遇到内存错误。因此,将其批处理为每日文件(完整文件包含一个月的数据)。

最佳答案

这里有一些建议,类似的问题是 here

使用压缩:参见 here .你应该试试这个(这可能会使它更快/更慢,具体取决于你正在查询的内容),YMMV。

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

在 block 中使用分层查询。我的意思是这个。由于您关心的 c_idf_id 数量相对较少,因此可以像这样构造一个查询。这有点像使用 isin

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
""" create a list of batches, maxed at maxn """
batches = []
while(True):
if len(l) <= maxn:
if len(l) > 0:
batches.append(l)
break
batches.append(l[0:maxn])
l = l[maxn:]
return batches


results = []
for f_id_batch in create_batches(f_id_list):

for c_id_batch in create_batches(c_id_list):

q = "f_id={f_id} & c_id={c_id}".format(
f_id=f_id_batch,
c_id=c_id_batch)

# you can include the max/min times in here as well (they would be max/min
# time for ALL the included batches though, maybe easy for you to compute

result = store.select('df',where=q)

# sub process this result

def f(x):
# you will need to filter out the min/max timestamps here (which I gather
# are somewhat dependent on f_id/c_id group

#### process the data and return something
# you could do something like: ``return x.describe()`` for simple stats

results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

这里的关键是处理使得 isin 的成员不超过 32 个对于您正在查询的任何变量。这是一个内部 numpy/pytables 限制。如果超过这个,查询将工作,但它会删除该变量并重新索引在所有数据上(这不是你想要的)。

通过这种方式,只需几个循环,您就可以在内存中获得一个很好的数据子集。这些查询我认为与您的大多数查询所花费的时间大致相同,但您的时间会少得多。

对于给定的子集,查询时间大致恒定(除非数据经过排序以使其完全索引)。

所以查询扫描数据“ block ”(这是索引指向的)。如果您在许多 block 中有很多命中,那么查询会变慢。

举个例子

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]:
A B C c_id f_id
2013-01-01 00:00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

这个特定示例是 5GB 未压缩和 2.9GB 压缩。这些结果是关于压缩数据的。在这种情况下,使用未压缩的实际上要快得多(例如,第一个循环需要 3.5 秒)。这是 100MM 行。

因此,使用最后一个示例 (4),您获得的数据是第一个示例的 9 倍,查询时间略多于 3 倍。

但是您的加速应该更多,因为您不会选择单个时间戳,而是稍后再做。

整个方法考虑到您有足够的主内存来保存批量大小的结果(例如,您在批量查询中选择了集合中相对较小的部分)。

关于python - 使用 Pandas 提高大型 HDFStore 表的查询性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22777284/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com