gpt4 book ai didi

python - 用于写入 csv 的多处理

转载 作者:行者123 更新时间:2023-12-04 13:23:49 26 4
gpt4 key购买 nike

我正在尝试将一个巨大的数据集 ~146m 行写入 CSV。我试过这个:

def paramlist():
for row in nodes.itertuples():
l = []
for row2 in ref_stops.itertuples():
l.append((row[1], row[2], row[3], row2[1],
row2[2], row2[3], row2[4], haversine(row[3], row[2], row2[3], row2[2])))
yield l

pool = multiprocessing.Pool()
pool.map(func, paramlist())

def func(params):
with open(r'big_file.csv', 'a') as f:
writer = csv.writer(f)
for row in params:
writer.writerow(row)

这段代码有效,但它吃掉了我所有的 RAM 并中止。我该如何优化它?

最佳答案

pool.map在将部分迭代提交给池的工作人员之前,将消耗整个迭代。这就是您遇到内存问题的原因。
您应该使用 pool.imap 而是为了避免这种情况。见 this post进行彻底的解释。

话虽如此,我真诚地怀疑多处理会以您编写的方式加速您的程序,因为瓶颈是磁盘 I/O。一遍又一遍地打开、追加和关闭文件几乎不会比一次顺序写入快。并行写入单个文件是不可能的。

假设生成l需要一些时间,如果您像这样编写程序,可能会加速:

from contextlib import closing
import multiprocessing
import csv
import pandas as pd
import numpy as np

# Just for testing
ref_stops = pd.DataFrame(np.arange(100).reshape((-1, 5)))
nodes = pd.DataFrame(np.arange(400).reshape((-1, 4)))
def haversine(a, b, c, d):
return a*b*c*d

# This function will be executed by the workers
def join_rows(row):
row_list = []
# join row with all rows from `ref_stops` and compute haversine
for row2 in ref_stops.itertuples():
row_list.append((row[1], row[2], row[3],
row2[1], row2[2], row2[3], row2[4],
haversine(row[3], row[2], row2[3], row2[2])))
return row_list


def main():
with closing(multiprocessing.Pool()) as pool:
# joined_rows will contain lists of joined rows in arbitrary order.
# use name=None so we get proper tuples, pandas named tuples cannot be pickled, see https://github.com/pandas-dev/pandas/issues/11791
joined_rows = pool.imap_unordered(join_rows, nodes.itertuples(name=None))

# open file and write out all rows from incoming lists of rows
with open(r'big_file.csv', 'w') as f:
writer = csv.writer(f)
for row_list in joined_rows:
writer.writerows(row_list)

if __name__ == '__main__':
main()

我假设您不关心订单,否则您一开始就不会选择多处理,对吗?
这样,它不是生成行列表的主进程,而是工作进程。一旦一个工作进程完成了一个列表,它就会将它返回给主进程,然后主进程将其条目附加到文件中。然后工作人员获取一个新行并开始构建另一个列表。

一般来说,在程序中使用更多的 Pandas 功能可能会更好(我假设您使用的是 Pandas 数据帧,因为 itertuples )。例如,您可以创建一个新的 Dataframe 而不是行列表并创建 haversine兼容 pandas.Series对象,因此您不必在每个条目上都调用它。

关于python - 用于写入 csv 的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43804728/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com