gpt4 book ai didi

python dask DataFrame,支持(可简单并行化)行吗?

转载 作者:IT老高 更新时间:2023-10-28 21:51:23 34 4
gpt4 key购买 nike

我最近发现 dask旨在成为易于使用的python并行处理模块的模块。对我来说最大的卖点是它适用于 pandas。

在其手册页上阅读了一下之后,我找不到一种方法来完成这个琐碎的可并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

目前,为了实现这一目标,AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

这是一种丑陋的语法,实际上比完全慢

df.apply(func, axis = 1) # for pandas DF row apply

有什么建议吗?

编辑:感谢@MRocklin 提供 map 功能。它似乎比普通 Pandas 应用要慢。这与 Pandas GIL 发布问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec

最佳答案

map_partitions

您可以使用 map_partitions 函数将您的函数应用于数据帧的所有分区。

df.map_partitions(func, columns=...)

请注意,func 一次只会给出数据集的一部分,而不是像 pandas apply 那样的整个数据集(如果你想进行并行处理,你可能不希望这样做。)

map/应用

您可以使用 map

在系列中逐行映射函数
df.mycolumn.map(func)

您可以使用 apply

在数据帧中逐行映射函数
df.apply(func, axis=1)

线程与进程

从 0.6.0 版开始,dask.dataframes 与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。你可以试试进程

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

但避免应用

但是,在 Pandas 和 Dask 中,您确实应该避免 apply 使用自定义 Python 函数。这通常是性能不佳的根源。可能是,如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe。

考虑 numba

对于您的特定问题,您可以考虑numba .这会显着提高您的表现。

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

免责声明,我为制作 numbadask 的公司工作,并雇佣了许多 pandas 开发人员。

关于python dask DataFrame,支持(可简单并行化)行吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31361721/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com