gpt4 book ai didi

Pandas :优化我的代码 (groupby()/apply())

转载 作者:行者123 更新时间:2023-12-04 06:17:24 25 4
gpt4 key购买 nike

我有一个形状 (RxC) 1.5M x 128 的数据框。我执行以下操作:

  • 我基于 6 列进行 groupby() 。这将创建约 8700 个子组,每个子组的形状为 538 x 122。
  • 在每个子组上,我运行 apply()。此函数计算子组中每个分类值 PER 列(即 122)的频率百分比。

  • 所以我的(伪)代码:
    <df = Read dataframe from file>
    g = df.groupby(grp_cols)
    g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts()) / len(d.index))

    该代码对我来说工作正常,所以现在我正在对其进行分析以提高性能。 apply() 函数运行大约需要 20-25 分钟。我相信问题在于它在每一列(122 次)上迭代 8700 次(每个子组),这可能不是最好的方法(考虑到我的编码方式)。

    谁能推荐我可以尝试加快速度的方法?

    我尝试使用 python 多处理池(8 个进程)将子组分成相等的集合进行处理,但最终得到了一些酸洗错误......

    谢谢。

    最佳答案

    pd.DataFrame.groupby.apply 确实为我们提供了很大的灵 active (与 agg/filter/transform 不同,它允许您将每个子组 reshape 为任何形状,在您的情况下,从 538 x 122 到 N_categories x 122)。但这确实是有代价的:一个接一个地应用您的灵活功能并且缺乏矢量化。

    我仍然认为解决它的方法是使用多处理。您遇到的泡菜错误很可能是因为您在 multi_processing_function 中定义了一些函数。规则是您必须将所有函数移到顶层。请参阅下面的代码。

    import pandas as pd
    import numpy as np

    # simulate your data with int 0 - 9 for categorical values
    df = pd.DataFrame(np.random.choice(np.arange(10), size=(538, 122)))
    # simulate your groupby operations, not so cracy with 8700 sub-groups, just try 800 groups for illustration
    sim_keys = ['ROW' + str(x) for x in np.arange(800)]
    big_data = pd.concat([df] * 800, axis=0, keys=sim_keys)
    big_data.shape

    big_data.shape
    Out[337]: (430400, 122)

    # Without multiprocessing
    # ===================================================
    by_keys = big_data.groupby(level=0)

    sample_group = list(by_keys)[0][1]
    sample_group.shape

    def your_func(g):
    return g.apply(lambda s: s.value_counts()) / len(g.index)

    def test_no_multiprocessing(gb, apply_func):
    return gb.apply(apply_func)

    %time result_no_multiprocessing = test_no_multiprocessing(by_keys, your_func)

    CPU times: user 1min 26s, sys: 4.03 s, total: 1min 30s
    Wall time: 1min 27

    这里很慢。让我们使用多处理模块:
    # multiprocessing for pandas dataframe apply
    # ===================================================
    # to void pickle error, must define functions at TOP level, if we move this function 'process' into 'test_with_multiprocessing', it raises a pickle error
    def process(df):
    return df.groupby(level=0).apply(your_func)

    def test_with_multiprocessing(big_data, apply_func):

    import multiprocessing as mp

    p = mp.Pool(processes=8)
    # split it into 8 chunks
    split_dfs = np.array_split(big_data, 8, axis=0)
    # define the mapping function, wrapping it to take just df as input
    # apply to each chunk
    df_pool_results = p.map(process, split_dfs)

    p.close()

    # combine together
    result = pd.concat(df_pool_results, axis=0)

    return result


    %time result_with_multiprocessing = test_with_multiprocessing(big_data, your_func)

    CPU times: user 984 ms, sys: 3.46 s, total: 4.44 s
    Wall time: 22.3 s

    现在,它要快得多,尤其是在 CPU 时间方面。虽然在我们拆分和重组结果时会有一些开销,但在使用 8 核处理器时,它预计比非多处理情况快 4 到 6 倍。

    最后,检查两个结果是否相同。
    import pandas.util.testing as pdt

    pdt.assert_frame_equal(result_no_multiprocessing, result_with_multiprocessing)

    漂亮地通过测试。

    关于 Pandas :优化我的代码 (groupby()/apply()),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30904354/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com