- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我最近发现 dask旨在成为易于使用的python并行处理模块的模块。对我来说最大的卖点是它适用于 pandas。
在其手册页上阅读了一下之后,我找不到一种方法来完成这个琐碎的可并行化任务:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
目前,为了实现这一目标,AFAIK,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
这是一种丑陋的语法,实际上比完全慢
df.apply(func, axis = 1) # for pandas DF row apply
有什么建议吗?
编辑:感谢@MRocklin 提供 map 功能。它似乎比普通 Pandas 应用要慢。这与 Pandas GIL 发布问题有关还是我做错了?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
最佳答案
map_partitions
您可以使用 map_partitions
函数将您的函数应用于数据帧的所有分区。
df.map_partitions(func, columns=...)
请注意,func 一次只会给出数据集的一部分,而不是像 pandas apply
那样的整个数据集(如果你想进行并行处理,你可能不希望这样做。)
map
/应用
您可以使用 map
df.mycolumn.map(func)
您可以使用 apply
df.apply(func, axis=1)
从 0.6.0 版开始,dask.dataframes
与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。你可以试试进程
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
应用
但是,在 Pandas 和 Dask 中,您确实应该避免 apply
使用自定义 Python 函数。这通常是性能不佳的根源。可能是,如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe。
numba
对于您的特定问题,您可以考虑numba
.这会显着提高您的表现。
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
免责声明,我为制作 numba
和 dask
的公司工作,并雇佣了许多 pandas
开发人员。
关于python dask DataFrame,支持(可简单并行化)行吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31361721/
如果我有一个依赖于某些全局或其他常量的函数,如下所示: x = 123 def f(partition): return partition + x # note that x is def
我们可以通过哪些方式在 Dask Arrays 中执行项目分配?即使是一个非常简单的项目分配,如:a[0] = 2 不起作用。 最佳答案 正确的。这是文档中提到的第一个限制。 通常,涉及 for 循环
[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786 distributed.nanny - INFO -
我正在构建一个 FastAPI 应用程序,它将为 Dask 数组的 block 提供服务。我想利用 FastAPI's asynchronous functionality旁边Dask-distrib
在延迟数据帧处理的几个阶段之后,我需要在保存数据帧之前对其进行重新分区。但是,.repartition() 方法要求我知道分区的数量(而不是分区的大小),这取决于处理后数据的大小,这是未知的。 我想我
我正在努力转换 dask.bag将字典放入 dask.delayed pandas.DataFrames进入决赛 dask.dataframe 我有一个函数 (make_dict) 将文件读入一个相当
我正在尝试使用 dask_cudf/dask 读取单个大型 parquet 文件(大小 > gpu_size),但它目前正在读取它到一个分区中,我猜这是从文档字符串推断出的预期行为: dask.dat
当启动一个 dask 分布式本地集群时,您可以为 dashboard_address 设置一个随机端口或地址。 如果稍后获取scheduler对象。有没有办法提取仪表板的地址。 我有这个: clust
我有一个 dask 数据框,由 parquet 支持。它有 1.31 亿行,当我对整个帧执行一些基本操作时,它们需要几分钟。 df = dd.read_parquet('data_*.pqt') un
我正在使用 24 个 vCPU 的谷歌云计算实例。运行代码如下 import dask.dataframe as dd from distributed import Client client =
我正在尝试在多台机器上分发一个大型 Dask 数据帧,以便(稍后)在数据帧上进行分布式计算。我为此使用了 dask-distributed。 我看到的所有 dask 分布式示例/文档都是从网络资源(h
我在 Django 服务器后面使用 Dask,这里总结了我的基本设置:https://github.com/MoonVision/django-dask-demo/可以在这里找到 Dask 客户端:h
我有以下格式的 Dask DataFrame: date hour device param value 20190701 21 dev_01 att_1 0.00
我正在尝试使用 dask 而不是 Pandas,因为我有 2.6gb csv 文件。 我加载它,我想删除一列。但似乎无论是 drop 方法 df.drop('column') 或切片 df[ : ,
我有一个比我的内存大得多的文本文件。我想按字典顺序对该文件的行进行排序。我知道如何手动完成: 分成适合内存的块 对块进行排序 合并块 我想用 dask 来做。我认为处理大量数据将是 dask 的一个用
使用 Dask 的分布式调度程序时,我有一个正在远程工作人员上运行的任务,我想停止该任务。 我该如何阻止?我知道取消方法,但如果任务已经开始执行,这似乎不起作用。 最佳答案 如果它还没有运行 如果任务
我需要将一个非常大的 dask.bag 的元素提交到一个非线程安全的存储区,即我需要类似的东西 for x in dbag: store.add(x) 我无法使用compute,因为包太大,无
如果我有一个已经索引的 Dask 数据框 >>> A.divisions (None, None) >>> A.npartitions 1 我想设置分区,到目前为止我正在做 A.reset_index
根据 this回答,如果 Dask 知道数据帧的索引已排序,则 Dask 数据帧可以执行智能索引。 如果索引已排序,我如何让 Dask 知道? 在我的具体情况下,我正在做这样的事情: for sour
我想从具有特定数量的工作人员的 python 启动本地集群,然后将客户端连接到它。 cluster = LocalCluster(n_workers=8, ip='127.0.0.1') client
我是一名优秀的程序员,十分优秀!