gpt4 book ai didi

python - 如何并行化行式 Pandas 数据框的 apply() 方法

转载 作者:行者123 更新时间:2023-12-04 14:58:22 25 4
gpt4 key购买 nike

我有以下代码:

import pandas as pd
import time

def enrich_str(str):

val1 = f'{str}_1'
val2 = f'{str}_2'
val3 = f'{str}_3'
time.sleep(3)

return val1, val2, val3

def enrich_row(passed_row):
col_name = str(passed_row['colName'])
my_string = str(passed_row[col_name])

val1, val2, val3 = enrich_str(my_string)

passed_row['enriched1'] = val1
passed_row['enriched2'] = val2
passed_row['enriched3'] = val3

return passed_row


df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']},
columns=['numbers', 'colors'])

df['colName'] = 'colors'

tic = time.perf_counter()
enriched_df = df.apply(enrich_row, col_name='colors', axis=1)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")

enriched_df

需要 15 秒才能得到如下所示的输出数据帧:

enter image description here

现在我想在我的机器上使用多个线程并行化浓缩操作。我探索了很多解决方案,例如 Dasknumba,但对我来说似乎没有一个是直截了当的。

然后我偶然发现了 multiprocessing 库及其 pool.imaps() 方法。所以我尝试运行以下代码:

import multiprocessing as mp

tic = time.perf_counter()
pool = mp.Pool(5)
result = pool.imap(enrich_row, df.itertuples(), chunksize=1)
pool.close()
pool.join()
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
result

大约需要 2 秒,result 不是 Pandas 数据框。我不知道哪里出了问题。

最佳答案

我建议您使用 pathos fork multiprocessing,因为它将更好地处理数据帧的酸洗。 imap 返回一个迭代器,而不是 DataFrame,所以你必须将它转换回来:

def enrich_row(row_tuple):
passed_row = row_tuple[1]
col_name = str(passed_row['colName'])
my_string = str(passed_row[col_name])

val1, val2, val3 = enrich_str(my_string)

passed_row['enriched1'] = val1
passed_row['enriched2'] = val2
passed_row['enriched3'] = val3

return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']},
columns=['numbers', 'colors'])

df['colName'] = 'colors'

from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

请注意,我正在使用 df.iterrows(),它返回元组 (row_number, row) 的迭代器,因此我修改了 enrich_row 处理这种格式。

关于python - 如何并行化行式 Pandas 数据框的 apply() 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67457956/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com