gpt4 book ai didi

python - 如何使用dask有效地计算许多简单统计数据

转载 作者:太空宇宙 更新时间:2023-11-03 15:38:42 25 4
gpt4 key购买 nike

问题
我想用dask计算一堆“容易收集”的统计数据。
速度是我最关心的问题,也是我的目标,所以我希望对这个问题有一个广泛的研究。
理想情况下,我希望在不到一小时内完成所描述的问题。
我希望雇佣100-1000名工人。
目前在基准测试中,我正在大型机器上运行这个程序(160核,4tb ram),但计划很快转移到kubernetes。
安装程序
我在数据框中有一些数据(pandas、dask、csv、parquet等)
我还有很多数据子集(带有任意列过滤器),我想为它们计算统计信息。
数据帧大小:介于5 GB和5 TB之间的数据(1亿行,1000列)。预计未来为50-100 TB。
统计数据大小:大约5000个唯一的过滤器,每个唯一的过滤器有1到500个统计数据。(5K-5M统计)
玩具示例如下:

requested_statistics = [
{'filters': [{'col': 'col_1', 'op': 'lt', 'value': 0.8},
{'col': 'col_38', 'op': 'lt', 'value': 0.4},
{'col': 'col_224', 'op': 'gt', 'value': 0.12333}],
'output': {'col': 'col_3', 'op': 'sum'},
'name': 'stat_1'},
{'filters': [{'col': 'col_551', 'op': 'lt', 'value': 0.8},
{'col': 'col_112', 'op': 'gt', 'value': '2018-01-13'},
{'col': 'col_1', 'op': 'lt', 'value': 0.8}],
'output': {'col': 'col_2', 'op': 'avg'},
'name': 'stat_2'}
]

我可以编写一个简单的解析器,运行在dask或pandas上:
def filter_index(df, filter):
filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y}
return filter_ops[filter['op']](df[filter['col']], filter['value'])

def get_indexer(df, filters):
if len(filters) == 1:
return filter_index(df, filters[0])
return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))

def get_statistic(df, statistic):
indexer = get_indexer(df, statistic['filters'])
agg_ops = {'sum': np.sum, 'avg': np.mean, 'unique_count': lambda x: x.unique().size}
return agg_ops[statistic['output']['op']](df[statistic['output']['col']][indexer])

all_stats = {x['name']: get_statistic(df, x) for x in requested_statistics}

我试过一些优化。
1)只需依赖dask: future_stats = client.compute(all_stats)
这不起作用,因为优化图(或只是序列化到调度程序)的计算时间太长。
在小规模测试中,这种方法工作得很好,但当我放大nPartition时,这种方法的时间缩放效果似乎比O(N)差得多。
2)对每个统计数据进行计算( client.compute(stat, sync=True)client.compute(stat).result())。
这增加了与调度程序对话的太多开销,对于我试图计算的大约100000个统计数据来说,这将花费太长的时间。
3)缓存(通过持久化)中间结果(索引器),以便我可以重用。
考虑到过滤器有时可以共享索引器,我在 filter_indexget_indexer字段中添加了缓存。
具体来说,创建一个散列并 indexer = client.persist(indexer),在以后的调用中返回持久化索引器对于 get_indexer,我还添加了一个 combinations检查,试图查看高速缓存中是否存在任何过滤器子集。我还优化了调用统计信息的顺序,以便在下一个集合中最多只需要1个新的更改索引器。
(例如,一次完成所有共享同一个过滤器的操作,然后转到下一个)。
这就产生了一个不幸的后果,即需要大量的内存来保存所有的布尔掩码。
我还没有尝试滚动缓存(当计算运行时, cache.pop(index_id),一旦计算不再需要它持久化),但这是我的下一步。
当前的关键问题
上面列出的解决方案(3)是我目前实现的,但它的性能仍然不如我所希望的那样好。
内存开销非常高(有效地为每个唯一的筛选器创建一个完整的新列)
调度程序/图序列化似乎很昂贵
htop可以看出,大多数情况下,只有 dask-scheduler在以100%的速度运行,而工人大多处于空闲状态。
问题
1)我可以采取哪些其他方法,或者我的方法中是否有明显的遗漏?
2)我考虑过 df.query(string),但由于它在整个数据帧上运行,因此似乎效率低下(大量重复数据)这是真的吗,还是通过使用内置语法分析器取得了一些成功(我注意到dask图对此较小,但不确定它是否值得)。
3)调度器和单线程(?)dask图形生成器似乎是瓶颈,是否有明确的路径来并行化这些?
4)当我查看分布式bokeh状态观察程序时,我经常注意到它在这些计算过程中也会挂起,这使得调试变得困难,并且让我好奇使用web服务器是否真的会损害调度程序的性能?这是真的吗?
5)在日志中,我收到了很多 Event loop was unresponsive in Worker for Xs.警告我能做些什么来帮助平衡工作,或者重新编写分配给工作人员的任务,或者使调度程序更具响应性?
6)从减少图的复杂度的愿望来看,我有 repartition(npartitions=num_workers*2),但我不确定这是一个好的启发式还是我应该使用什么?
下面是调度程序正在管理的任务的一个示例(这是针对~25个唯一的筛选器,每个筛选器有~50个统计信息,总共有~1000个统计信息正在计算)。
https://i.imgur.com/hRzmXHP.png
感谢您对如何考虑优化这个问题的帮助或指导性建议。

最佳答案

我想到了两个一般性的建议,但是如果没有实践经验,很难诊断出这样的问题。听起来你已经在看仪表盘了,很高兴听到我将在这里集中讨论两个避免调度开销的建议,因为这是您特别提到的。
使用较大的分区
dd.read_csv等操作的默认分区大小足够小,可以在消费型笔记本电脑上使用。(我怀疑它们大约是128MB)考虑到节点的大小,您可以将其增加10倍(或更多)并保持良好状态。这将减少您的调度开销10倍。
使用高级图融合
截至2018年12月20日,这仍在开发分支中,但dask.dataframe开始在表达式级别而不是任务级别融合。这将有助于显著减少数千个统计数据的开销,从dask的角度来看,可能会将它们变成一个任务。
您可能需要跟踪以下prs:
https://github.com/dask/dask/pull/4092
https://github.com/dask/dask/pull/4229
我还鼓励您将您的用例作为github问题提供一个综合示例,以便它可以为将来的开发提供信息。我建议使用dask.datasets.timeseries()来生成一个假数据帧,然后使用一些简单的方法从中生成大量简单的统计数据(如果可能的话,简单的方法会更好,这样维护人员就不必陷得太深)。

关于python - 如何使用dask有效地计算许多简单统计数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53844188/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com