gpt4 book ai didi

Python并行数据处理

转载 作者:行者123 更新时间:2023-12-01 02:32:03 24 4
gpt4 key购买 nike

我们有一个大约有 1.5MM 行的数据集。我想并行处理它。该代码的主要功能是查找主控信息并丰富1.5MM行。主数据集是一个两列数据集,大约有 25000 行。但是我无法使多进程工作并正确测试其可扩展性。有人可以帮忙吗?精简版代码如下

import pandas
from multiprocessing import Pool

def work(data):
mylist =[]
#Business Logic
return mylist.append(data)

if __name__ == '__main__':
data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
print('Source Data :', data_df)
agents = 2
chunksize = 2
with Pool(processes=agents) as pool:
result = pool.map(func=work, iterable= data_df, chunksize=20)
pool.close()
pool.join()
print('Result :', result)

方法work将具有业务逻辑,我想将分区的data_df传递到work以启用并行处理。样本数据如下

CUSTOMER_ID,PRODUCT_ID,SALE_QTY
641996,115089,2
1078894,78144,1
1078894,121664,1
1078894,26467,1
457347,59359,2
1006860,36329,2
1006860,65237,2
1006860,121189,2
825486,78151,2
825486,78151,2
123445,115089,4

理想情况下,我想在每个分区中处理 6 行。

请帮忙。

感谢和问候

巴拉

最佳答案

首先,work 返回 mylist.append(data) 的输出,即 None。我假设(如果不是,我建议)你想返回一个处理过的 Dataframe。

要分配负载,您可以使用 numpy.array_split 将大型 Dataframe 拆分为 6 行 Dataframe 列表,然后由 work 处理。

import pandas
import math
import numpy as np
from multiprocessing import Pool

def work(data):
#Business Logic
return data # Return it as a Dataframe

if __name__ == '__main__':
data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
print('Source Data :', data_df)
agents = 2
rows_per_workload = 6
num_loads = math.ceil(data_df.shape[0]/float(rows_per_workload))
split_df = np.array_split(data_df, num_loads) # A list of Dataframes
with Pool(processes=agents) as pool:
result = pool.map(func=work, iterable=split_df)
result = pandas.concat(result) # Stitch them back together
pool.close()
pool.join()pool = Pool(processes=agents)
print('Result :', result)

关于Python并行数据处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46695467/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com