gpt4 book ai didi

Python 分块 CSV 文件多处理

转载 作者:太空狗 更新时间:2023-10-29 17:59:24 27 4
gpt4 key购买 nike

我正在使用以下代码将 CSV 文件拆分为多个 block (来自 here)

def worker(chunk):
print len(chunk)

def keyfunc(row):
return row[0]

def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()

但是,无论我选择使用多少 block , block 的数量似乎总是保持不变。例如,无论我选择有 1 个还是 10 个 block ,我在处理样本文件时总是得到这个输出。理想情况下,我想对文件进行分 block ,以便公平地分布。

请注意,我分 block 的真实文件超过 1300 万行,这就是我逐个处理它的原因。那是必须的!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---

最佳答案

根据 thecomments ,我们希望每个进程都在 10000 行的 block 上工作。这并不难去做;请参阅下面的 iter/islice 配方。但是,使用的问题

pool.map(worker, ten_thousand_row_chunks)

pool.map 会尝试将所有 block 放入任务队列立刻。如果这需要比可用内存更多的内存,那么你会得到一个内存错误。 (注:pool.imap suffers from the same problem .)

因此,我们需要在每个 block 的片段上迭代调用 pool.map

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
return len(chunk)

def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5

pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()

main()

每个 chunk 最多包含文件中的 chunksize*num_procs 行。这是足够的数据来为池中的所有工作人员提供一些工作,但不会太大而导致 MemoryError -- 前提是 chunksize 没有设置太大。

然后每个 chunk 被分成几 block ,每 block 最多包含文件中的 chunksize 行。然后将这些片段发送到 pool.map


iter(lambda: list(IT.islice(iterator, chunksize)), []) 是如何工作的:

这是一个习惯用法,用于将迭代器分组为长度为 chunksize 的 block 。让我们看看它是如何在一个例子中工作的:

In [111]: iterator = iter(range(10))

请注意,每次调用 IT.islice(iterator, 3) 时,一个包含 3 个项目的新 block 从迭代器中切出:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

当迭代器中剩余的项少于 3 个时,只返回剩余的项:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

如果你再次调用它,你会得到一个空列表:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize)) 是一个在调用时返回 list(IT.islice(iterator, chunksize)) 的函数。它是一个“单行”,相当于

def func():
return list(IT.islice(iterator, chunksize))

最后,iter(callable, sentinel) 返回另一个迭代器。此迭代器生成的值是可调用对象返回的值。它不断产生值,直到可调用对象返回一个等于标记的值。所以

iter(lambda: list(IT.islice(iterator, chunksize)), [])

将继续返回值 list(IT.islice(iterator, chunksize)) 直到该值为空列表:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

关于Python 分块 CSV 文件多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31164731/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com