gpt4 book ai didi

python - 并行化 Dask 聚合

转载 作者:行者123 更新时间:2023-12-05 06:15:44 25 4
gpt4 key购买 nike

建立于 this post ,我实现了自定义模式公式,但发现此函数的性能存在问题。本质上,当我进入这个聚合时,我的集群只使用我的一个线程,这对性能来说不是很好。我正在对 16k 行的 150 多个属性(主要是分类数据)进行计算,我认为我可以将它们拆分成单独的线程/进程,然后再一起放回到一个数据框中。请注意,此聚合必须在两列上,因此我可能会因为无法使用单个列作为索引而获得更差的性能。

有没有办法将 dask future 或并行处理纳入聚合计算?

import dask.dataframe as dd
from dask.distributed import Client
from pandas import DataFrame

def chunk(s):
return s.value_counts()

def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()

def finalize(s):
# s is a multi-index series of the form (group, value): count. First
# manually group on the group part of the index. The lambda will receive a
# sub-series with multi index. Next, drop the group part from the index.
# Finally, determine the index with the maximum value, i.e., the mode.
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).argmax())
)

def main() -> DataFrame:
client = Client('scheduler:8786')

ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
result = ddf.groupby(['a','b']).agg(custom_mode).compute()
return result

旁注,我正在使用 Docker 来启动我的调度程序和使用 daskdev/dask (2.18.1) docker 镜像的工作程序。

最佳答案

最后,我使用 futures 从本质上并行化了每一列的聚合。由于我有这么多列,将每个聚合传递给它自己的工作线程为我节省了大量时间。感谢 David 的评论以及 the article on parallel workloads from the dask documentation !

from dask.distributed import Client
from pandas import DataFrame

def chunk(s):
return s.value_counts()

def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()

def finalize(s):
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
)

def delayed_mode(ddf, groupby, col, custom_agg):
return ddf.groupby(groupby).agg({col: custom_agg}).compute()

def main() -> DataFrame:
client = Client('scheduler:8786')

ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)

futures = []

for col in multiple_trimmed.columns:
future = client.submit(delayed_mode, ddf, ["a", "b"], col, custom_mode_dask)
futures.append(future)

ddfs = client.gather(futures)
result = pd.concat(ddfs, axis=1)
return result

关于python - 并行化 Dask 聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62352617/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com