- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个简单的程序,它读取一个包含几百万行的大文件,解析每一行(numpy array
)并转换成 double 组(python array
)然后写入 hdf5 文件
。我重复这个循环好几天。读取每个文件后,我删除所有对象并调用垃圾收集器。当我运行该程序时,第一天的解析没有任何错误,但第二天我得到了 MemoryError
。我监控了程序的内存使用情况,在解析的第一天,内存使用量约为 1.5 GB。第一天的解析完成后,内存使用量下降到 50 MB。现在,当第 2 天开始时,我尝试从文件中读取行,我得到了 MemoryError
。以下是程序的输出。
source file extracted at C:\rfadump\au\2012.08.07.txt
parsing started
current time: 2012-09-16 22:40:16.829000
500000 lines parsed
1000000 lines parsed
1500000 lines parsed
2000000 lines parsed
2500000 lines parsed
3000000 lines parsed
3500000 lines parsed
4000000 lines parsed
4500000 lines parsed
5000000 lines parsed
parsing done.
end time is 2012-09-16 23:34:19.931000
total time elapsed 0:54:03.102000
repacking file
done
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles()
-> while single_date <= self.end_date:
(Pdb) c
*** 2012-08-08 ***
source file extracted at C:\rfadump\au\2012.08.08.txt
cought an exception while generating file for day 2012-08-08.
Traceback (most recent call last):
File "rfaDumpToHDF.py", line 175, in generateFile
lines = self.rawfile.read().split('|\n')
MemoryError
我非常确定 Windows 系统任务管理器显示此进程的内存使用量为 50 MB。看起来 Python 的垃圾收集器或内存管理器没有正确计算可用内存。应该有很多空闲内存,但它认为没有足够的。
有什么想法吗?
编辑
在这里添加我的代码
我会放上我的部分代码。我是 python 新手,请原谅我的 python 编码风格。
模块 1
def generateFile(self, current_date):
try:
print "*** %s ***" % current_date.strftime("%Y-%m-%d")
weekday=current_date.weekday()
if weekday >= 5:
print "skipping weekend"
return
self.taqdb = taqDB(self.index, self.offset)
cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5"))
outputFile = config.hdf5.filePath(self.index, date=current_date)
print "cache file: ", cache_filename
print "output file: ", outputFile
tempdir = "C:\\rfadump\\"+self.region+"\\"
input_filename = tempdir + filename
print "source file extracted at %s " % input_filename
## universe
reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj) ## PARSER
count = 0
self.rawfile = open(input_filename, 'r')
lines = self.rawfile.read().split('|\n')
total_lines = len(lines)
self.rawfile.close()
del self.rawfile
print "parsing started"
start_time = dt.datetime.now()
print "current time: %s" % start_time
#while(len(lines) > 0):
while(count < total_lines):
#line = lines.pop(0) ## This slows down processing
result = reader.parseline(lines[count]+"|")
count += 1
if(count % 500000 == 0):
print "%d lines parsed" %(count)
if(result == None):
continue
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result
if(len(levelsUpdated) == 0 and tradeupdate == False):
continue
self.taqdb.insert(result)
## write to hdf5 TODO
writer = h5Writer.h5Writer(cache_filename, self.tickobj)
writer.write(self.taqdb.groups)
writer.close()
del lines
del self.taqdb, self.tickobj
##########################################################
print "parsing done."
end_time = dt.datetime.now()
print "end time is %s" % end_time
print "total time elapsed %s" % (end_time - start_time)
defragger = hdf.HDF5Defragmenter()
defragger.Defrag(cache_filename,outputFile)
del defragger
print "done"
gc.collect(2)
except:
print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d")
tb = traceback.format_exc()
print tb
模块 2 - taqdb - 将解析后的数据存储在数组中
class taqDB:
def __init__(self, index, offset):
self.index = index
self.tickcfg = config.hdf5.getTickConfig(index)
self.offset = offset
self.groups = {}
def getGroup(self,ric):
if (self.groups.has_key(ric) == False):
self.groups[ric] = {}
return self.groups[ric]
def getOrderbookArray(self, ric, group):
datasetname = orderBookName
prodtype = self.tickcfg.getProdType(ric)
if(prodtype == ProdType.INDEX):
return
orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype)
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
orderbookArray = self.tickcfg.getOrderBookArray(prodtype)
return orderbookArray
else:
orderbookArray = group[datasetname]
if(len(orderbookArray) == 0):
return self.tickcfg.getOrderBookArray(prodtype)
lastOrderbook = orderbookArray[-orderbookArrayShape[1]:]
return np.array([lastOrderbook])
def addToDataset(self, group, datasetname, timestamp, arr):
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
arr[0,0]=timestamp
a1 = group[datasetname]
a1.extend(arr[0])
def addToOrderBook(self, group, timestamp, arr):
self.addToDataset(self, group, orderBookName, timestamp, arr)
def insert(self, data):
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data
delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000))
timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
## write to array
group = self.getGroup(ric)
orderbookUpdate = False
orderbookArray = self.getOrderbookArray(ric, group)
nonzero = quotes.nonzero()
orderbookArray[nonzero] = quotes[nonzero]
if(np.any(nonzero)):
self.addToDataset(group, orderBookName, timestamp, orderbookArray)
if(tradeupdate == True):
self.addToDataset(group, tradeName, timestamp, trades)
模块 3 - 解析器
class rfaTextToTAQ:
"""RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs."""
def __init__(self,tickconfig):
self.tickconfig = tickconfig
self.token = ''
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.quotes = np.array([]) # read from tickconfig
self.trades = np.array([]) # read from tickconfig
self.prodtype = ProdType.STOCK
self.allquotes = {}
self.alltrades = {}
self.acvol = 0
self.levelsUpdated = []
self.quoteUpdate = False
self.tradeUpdate = False
self.depth = 0
def updateLevel(self, index):
if(self.levelsUpdated.__contains__(index) == False):
self.levelsUpdated.append(index)
def updateQuote(self, fidindex, field):
self.value = float(self.value)
if(self.depth == 1):
index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1))
self.quotes[index[0]][fidindex[1][0]] = self.value
self.updateLevel(index[0])
else:
self.quotes[fidindex] = self.value
self.updateLevel(fidindex[0][0])
self.quoteUpdate = True
def updateTrade(self, fidindex, field):
#self.value = float(self.value)
if(self.tickconfig.tradeUpdate(self.depth) == False):
return
newacvol = float(self.value)
if(field == acvol):
if(self.value > self.acvol):
tradesize = newacvol - self.acvol
self.acvol = newacvol
self.trades[fidindex] = tradesize
if(self.trades.__contains__(0) == False):
self.tradeUpdate = True
else:
self.trades[fidindex] = self.value
if(not (self.trades[0,1]==0 or self.trades[0,2]==0)):
self.tradeUpdate = True
def updateResult(self):
field = ''
valid, field = field_dict.FIDToField(int(self.fid), field)
if(valid == False):
return
if(self.value == '0'):
return
if(self.prodtype == ProdType.STOCK):
fidindex = np.where(self.tickconfig.stkQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.stkTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
else:
fidindex = np.where(self.tickconfig.futQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.futTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
def getOrderBookTrade(self):
if (self.allquotes.has_key(self.ric) == False):
acvol = 0
self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype)
trades = self.tickconfig.getTradesArray()
self.alltrades[self.ric] = [trades, acvol]
return self.allquotes[self.ric], self.alltrades[self.ric]
def parseline(self, line):
self.tradeUpdate = False
self.levelsUpdated = []
pos = 0
length = len(line)
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.token = ''
ch = ''
while(pos < length):
prevChar = ch
ch = line[pos]
pos += 1
#SEQ_NUM
if(self.state == ReadState.SEQ_NUM):
if(ch != ','):
self.token += ch
else:
self.seq_num = int(self.token)
self.state = ReadState.TIMESTAMP
self.token = ''
# TIMESTAMP
elif(self.state == ReadState.TIMESTAMP):
if(ch == ' '):
self.token = ''
elif(ch != ','):
self.token += ch
else:
if(len(self.token) != 12):
print "Invalid timestamp format. %s. skipping line.\n", self.token
self.state = ReadState.SKIPLINE
else:
self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f')
self.state = ReadState.RIC
self.token = ''
# RIC
elif(self.state == ReadState.RIC):
if(ch != ','):
self.token += ch
else:
self.ric = self.token
self.token = ''
self.ric, self.depth = self.tickconfig.replaceRic(self.ric)
self.prodtype = self.tickconfig.getProdType(self.ric)
if(self.tickconfig.subscribed(self.ric)):
self.state = ReadState.UPDATE_TYPE
self.quotes, trades = self.getOrderBookTrade()
self.trades = trades[0]
self.acvol = trades[1]
else:
self.state = ReadState.SKIPLINE
# UPDATE_TYPE
elif(self.state == ReadState.UPDATE_TYPE):
if(ch != '|'):
self.token += ch
else:
self.update_type = self.token
self.token = ''
self.state = ReadState.FVPAIRS
#SKIPLINE
elif(self.state == ReadState.SKIPLINE):
return None
# FV PAIRS
elif(self.state == ReadState.FVPAIRS):
# FID
if(self.fvstate == fvstate.FID):
if(ch != ','):
if(ch.isdigit() == False):
self.token = self.value+ch
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
else:
self.token += ch
else:
self.fid = self.token
self.token = ''
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
# FIDVALUE
elif(self.fvstate == fvstate.FIDVALUE):
if(ch != '|'):
self.token += ch
else:
self.value = self.token
self.token = ''
self.state = ReadState.FVPAIRS
self.fvstate = fvstate.FID
# TODO set value
self.updateResult()
return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate
谢谢。
最佳答案
The only reliable way to free memory is to terminate the process.
所以,如果你的主程序spawns a worker process完成大部分工作(一天内完成的工作)然后当该工作进程完成时,将释放使用的内存:
import multiprocessing as mp
def work(date):
# Do most of the memory-intensive work here
...
while single_date <= self.end_date:
proc = mp.Process(target = work, args = (single_date,))
proc.start()
proc.join()
关于python - Python 垃圾收集器有问题吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12452017/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!