- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我一直在测试如何使用 dask(具有 20 个内核的集群),我对调用 len 函数与通过 loc 切片的速度相比感到惊讶。
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
知道为什么会发生这种情况吗? len 运行得快对我来说并不重要,但我觉得由于不理解这种行为,所以我对这个库有一些不了解的地方。
所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin 的这篇文章中 http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工作人员完成他的任务,然后再开始另一个工作人员)
最佳答案
问得好,这涉及到数据何时向上移动到集群并返回到客户端(您的 Python session )的几点。让我们看看您计算的几个阶段
这是您的 python session 中的 Pandas 数据框,因此它显然仍在您的本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
这会将您的 Pandas 数据框分解为 20 个 Pandas 数据框,但这些数据框仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。
logd = dd.from_pandas(log,npartitions=20) # still on client
调用 len
实际上会在此处进行计算(通常您会使用 df.some_aggregation().compute()
。所以现在 Dask 启动了。首先它将您的数据移出到集群(慢),然后它在所有 20 个分区上调用 len(快),它聚合这些(快),然后将结果向下移动到您的客户端,以便它可以打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
所以这里的问题是我们的 dask.dataframe 仍然在本地 python session 中拥有它的所有数据。
使用本地线程调度器比分布式调度器要快得多。这应该以毫秒计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
但您可能想知道如何扩展到更大的数据集,所以让我们以正确的方式进行。
让 Dask 工作人员加载 csv 文件的位,而不是在您的客户端/本地 session 中加载 Pandas。这样就不需要客户与工作人员的沟通。
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
但是,与 pd.read_csv
不同,dd.read_csv
是惰性的,因此它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算
log = client.persist(log) # triggers computation asynchronously
现在集群开始运行并直接在工作进程中加载数据。这个比较快。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请调用 wait
。
from dask.distributed import wait
wait(log) # blocks until read is done
如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改 block 大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log
上的操作现在应该很快
len(log) # fast
回答关于 this blogpost 的问题以下是我们对文件所在位置所做的假设。
通常,当您向 dd.read_csv
提供文件名时,它假定该文件对所有工作人员都是可见的。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(如 /path/to/myfile.*.csv
),或者确保您的工作人员和客户端具有相同的工作目录.
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。
简单的方法就是做你原来做的,但坚持你的 dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
这很好,但会导致沟通不尽如人意。
相反,您可以明确地将数据分散到集群中
[future] = client.scatter([log])
虽然这会涉及到更复杂的 API,所以我只会向您指出文档
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html
关于python - dask 分布式数据帧上的慢 len 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41902069/
如果我有一个依赖于某些全局或其他常量的函数,如下所示: x = 123 def f(partition): return partition + x # note that x is def
我们可以通过哪些方式在 Dask Arrays 中执行项目分配?即使是一个非常简单的项目分配,如:a[0] = 2 不起作用。 最佳答案 正确的。这是文档中提到的第一个限制。 通常,涉及 for 循环
[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786 distributed.nanny - INFO -
我正在构建一个 FastAPI 应用程序,它将为 Dask 数组的 block 提供服务。我想利用 FastAPI's asynchronous functionality旁边Dask-distrib
在延迟数据帧处理的几个阶段之后,我需要在保存数据帧之前对其进行重新分区。但是,.repartition() 方法要求我知道分区的数量(而不是分区的大小),这取决于处理后数据的大小,这是未知的。 我想我
我正在努力转换 dask.bag将字典放入 dask.delayed pandas.DataFrames进入决赛 dask.dataframe 我有一个函数 (make_dict) 将文件读入一个相当
我正在尝试使用 dask_cudf/dask 读取单个大型 parquet 文件(大小 > gpu_size),但它目前正在读取它到一个分区中,我猜这是从文档字符串推断出的预期行为: dask.dat
当启动一个 dask 分布式本地集群时,您可以为 dashboard_address 设置一个随机端口或地址。 如果稍后获取scheduler对象。有没有办法提取仪表板的地址。 我有这个: clust
我有一个 dask 数据框,由 parquet 支持。它有 1.31 亿行,当我对整个帧执行一些基本操作时,它们需要几分钟。 df = dd.read_parquet('data_*.pqt') un
我正在使用 24 个 vCPU 的谷歌云计算实例。运行代码如下 import dask.dataframe as dd from distributed import Client client =
我正在尝试在多台机器上分发一个大型 Dask 数据帧,以便(稍后)在数据帧上进行分布式计算。我为此使用了 dask-distributed。 我看到的所有 dask 分布式示例/文档都是从网络资源(h
我在 Django 服务器后面使用 Dask,这里总结了我的基本设置:https://github.com/MoonVision/django-dask-demo/可以在这里找到 Dask 客户端:h
我有以下格式的 Dask DataFrame: date hour device param value 20190701 21 dev_01 att_1 0.00
我正在尝试使用 dask 而不是 Pandas,因为我有 2.6gb csv 文件。 我加载它,我想删除一列。但似乎无论是 drop 方法 df.drop('column') 或切片 df[ : ,
我有一个比我的内存大得多的文本文件。我想按字典顺序对该文件的行进行排序。我知道如何手动完成: 分成适合内存的块 对块进行排序 合并块 我想用 dask 来做。我认为处理大量数据将是 dask 的一个用
使用 Dask 的分布式调度程序时,我有一个正在远程工作人员上运行的任务,我想停止该任务。 我该如何阻止?我知道取消方法,但如果任务已经开始执行,这似乎不起作用。 最佳答案 如果它还没有运行 如果任务
我需要将一个非常大的 dask.bag 的元素提交到一个非线程安全的存储区,即我需要类似的东西 for x in dbag: store.add(x) 我无法使用compute,因为包太大,无
如果我有一个已经索引的 Dask 数据框 >>> A.divisions (None, None) >>> A.npartitions 1 我想设置分区,到目前为止我正在做 A.reset_index
根据 this回答,如果 Dask 知道数据帧的索引已排序,则 Dask 数据帧可以执行智能索引。 如果索引已排序,我如何让 Dask 知道? 在我的具体情况下,我正在做这样的事情: for sour
我想从具有特定数量的工作人员的 python 启动本地集群,然后将客户端连接到它。 cluster = LocalCluster(n_workers=8, ip='127.0.0.1') client
我是一名优秀的程序员,十分优秀!