gpt4 book ai didi

python - 如何同时计算一个巨大文件中的词频?

转载 作者:行者123 更新时间:2023-11-28 18:02:41 24 4
gpt4 key购买 nike

我需要计算一个 3GB 的英文句子 gzip 纯文本文件的词频,解压后大约有 30GB。

我有一个包含 collections.Countergzip.open 的单线程脚本,需要几个小时才能完成。

由于逐行读取文件比拆分和计数快得多,我正在考虑一个生产者-消费者流程,一个文件读取器产生行,几个消费者进行拆分和计数,最后合并Counter 获取单词出现的次数。

但是,我找不到 ProcessPoolExecutor 将队列发送到 Executor 的示例,它们只是 map 列表中的单个项目。asyncio.Queue 只有单线程示例。

  • 这是一个巨大的文件,所以我无法读取整个文件并在计数之前获取list,因此我无法使用concurrent.futures.Executor.map。但我阅读的所有示例都使用固定列表作为开始。

  • 拆分和统计一个句子的时间相当于 fork 一个进程,所以我必须让每个消费者进程的生命周期更长。我不认为 map 可以合并 Counter,所以我不能使用 chunksize>1。因此,我必须给消费者一个队列,让他们继续计数,直到整个文件完成。但是大多数示例只向消费者发送一个项目并使用 chunksize=1000 来减少 fork 次数。

你能为我写一个例子吗?

我希望代码向后兼容 Python 3.5.3,因为 PyPy 更快。


我的实际情况是针对更具体的文件格式:

chr1    10011   141     0       157     4       41      50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54

我需要从第 3 列到第 8 列计算单列的每个直方图。所以我以词频作为一个更简单的例子。

我的代码是:

#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
import math

if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0

inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass

def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[{}], MaxDepth is [{}].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1

csv.DictReader 花费最多的时间。

cProfile


我的问题是,虽然 gzip 阅读器很快,但 csv 阅读器很快,我需要计算数十亿行。而且 csv 阅读器肯定比 gzip 阅读器慢。

因此,我需要将行分布到 csv 阅读器的不同工作进程,并分别进行下游计数。在一个生产者和多个消费者之间使用队列很方便。

由于我使用的是 Python 而不是 C,是否有一些用于多处理和队列的抽象包装器?是否可以将 ProcessPoolExecutorQueue 类一起使用?

最佳答案

一个 30 GB 的文本文件足以将您的问题放入大数据领域。因此,为了解决这个问题,我建议使用 Hadoop 和 Spark 等大数据工具。您解释为“生产者-消费者流”的内容基本上就是 MapReduce 算法的设计目的。单词计数频率是一个典型的 MapReduce 问题。查找它,您会发现大量示例。

关于python - 如何同时计算一个巨大文件中的词频?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55073002/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com