gpt4 book ai didi

python - dask 分布式数据帧上的慢 len 函数

转载 作者:太空狗 更新时间:2023-10-29 17:47:07 28 4
gpt4 key购买 nike

我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度相比感到惊讶。

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

知道为什么会发生这种情况吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对这个库有一些不了解的地方。

enter image description here

所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin 的这篇文章中 http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工作人员完成他的任务,然后再开始另一个工作人员)

enter image description here

最佳答案

问得好,这涉及到数据何时向上移动到集群并返回到客户端(您的 Python session )的几点。让我们看看您计算的几个阶段

使用 Pandas 加载数据

这是您的 python session 中的 Pandas 数据框,因此它显然仍在您的本地进程中。

log = pd.read_csv('800000test', sep='\t')  # on client

转换为懒惰的 Dask.dataframe

这会将您的 Pandas 数据框分解为 20 个 Pandas 数据框,但这些数据框仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算长度

调用 len 实际上会在此处进行计算(通常您会使用 df.some_aggregation().compute()。所以现在 Dask 启动了。首先它将您的数据移出到集群(慢),然后它在所有 20 个分区上调用 len(快),它聚合这些(快),然后将结果向下移动到您的客户端,以便它可以打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的 dask.dataframe 仍然在本地 python session 中拥有它的所有数据。

使用本地线程调度器比分布式调度器要快得多。这应该以毫秒计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
print(len(logd)) # stays on client

但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。

在worker上加载你的数据

让 Dask 工作人员加载 csv 文件的位,而不是在您的客户端/本地 session 中加载 Pandas。这样就不需要客户与工作人员的沟通。

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers

但是,与 pd.read_csv 不同,dd.read_csv 是惰性的,因此它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算

log = client.persist(log)  # triggers computation asynchronously

现在集群开始运行并直接在工作进程中加载​​数据。这个比较快。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请调用 wait

from dask.distributed import wait
wait(log) # blocks until read is done

如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改 block 大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,log 上的操作现在应该很快

len(log)  # fast

编辑

回答关于 this blogpost 的问题以下是我们对文件所在位置所做的假设。

通常,当您向 dd.read_csv 提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(如 /path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录.

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。

简单但次优

简单的方法就是做你原来做的,但坚持你的 dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers

这很好,但会导致沟通不尽如人意。

复杂但最优

相反,您可以明确地将数据分散到集群中

[future] = client.scatter([log])

虽然这会涉及到更复杂的 API,所以我只会向您指出文档

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html

关于python - dask 分布式数据帧上的慢 len 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41902069/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com