gpt4 book ai didi

python - 有效地将函数并行应用于分组的 pandas DataFrame

转载 作者:IT老高 更新时间:2023-10-28 21:10:14 26 4
gpt4 key购买 nike

我经常需要将一个函数应用到一个非常大的DataFrame(混合数据类型)的组中,并希望利用多个内核。

我可以从组中创建一个迭代器并使用多处理模块,但效率不高,因为每个组和函数的结果都必须为进程之间的消息传递进行腌制。

有什么方法可以避免酸洗甚至完全避免 DataFrame 的复制?看起来多处理模块的共享内存功能仅限于 numpy 数组。还有其他选择吗?

最佳答案

从上面的评论来看,这似乎是为 pandas 计划的(我刚刚注意到还有一个看起来很有趣的 rosetta project)。

然而,在所有并行功能都被合并到 pandas 之前,我注意到直接使用 cythonpandas 编写高效且非内存复制的并行扩充非常容易+ OpenMP 和 C++。

这是一个编写并行 groupby-sum 的简短示例,其用法如下:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

输出是:

     sum
key
0 6
1 11
2 4

注意 毫无疑问,这个简单示例的功能最终将成为 pandas 的一部分。然而,有些事情在 C++ 中并行化一段时间会更自然,重要的是要知道将其组合到 pandas 中是多么容易。


为此,我编写了一个简单的单源文件扩展名,其代码如下。

从一些导入和类型定义开始

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C++的unordered_map类型是单线程求和,vector是所有线程求和。

现在到函数 sum。它以 typed memory views 开头,用于快速访问:

def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values

该函数继续将半等分到线程(这里硬编码为 4),并让每个线程将其范围内的条目相加:

    cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)

当线程完成时,该函数将所有结果(来自不同范围)合并到单个 unordered_map:

    cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)

剩下的就是创建一个DataFrame并返回结果:

    key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)

df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df

关于python - 有效地将函数并行应用于分组的 pandas DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11728836/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com