gpt4 book ai didi

python - Python 垃圾收集器有问题吗?

转载 作者:太空狗 更新时间:2023-10-30 02:48:31 25 4
gpt4 key购买 nike

我有一个简单的程序,它读取一个包含几百万行的大文件,解析每一行(numpy array)并转换成 double 组(python array)然后写入 hdf5 文件。我重复这个循环好几天。读取每个文件后,我删除所有对象并调用垃圾收集器。当我运行该程序时,第一天的解析没有任何错误,但第二天我得到了 MemoryError。我监控了程序的内存使用情况,在解析的第一天,内存使用量约为 1.5 GB。第一天的解析完成后,内存使用量下降到 50 MB。现在,当第 2 天开始时,我尝试从文件中读取行,我得到了 MemoryError。以下是程序的输出。

source file extracted at C:\rfadump\au\2012.08.07.txt
parsing started
current time: 2012-09-16 22:40:16.829000
500000 lines parsed
1000000 lines parsed
1500000 lines parsed
2000000 lines parsed
2500000 lines parsed
3000000 lines parsed
3500000 lines parsed
4000000 lines parsed
4500000 lines parsed
5000000 lines parsed
parsing done.
end time is 2012-09-16 23:34:19.931000
total time elapsed 0:54:03.102000
repacking file
done
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles()
-> while single_date <= self.end_date:
(Pdb) c
*** 2012-08-08 ***
source file extracted at C:\rfadump\au\2012.08.08.txt
cought an exception while generating file for day 2012-08-08.
Traceback (most recent call last):
File "rfaDumpToHDF.py", line 175, in generateFile
lines = self.rawfile.read().split('|\n')
MemoryError

我非常确定 Windows 系统任务管理器显示此进程的内存使用量为 50 MB。看起来 Python 的垃圾收集器或内存管理器没有正确计算可用内存。应该有很多空闲内存,但它认为没有足够的。

有什么想法吗?

编辑

在这里添加我的代码

我会放上我的部分代码。我是 python 新手,请原谅我的 python 编码风格。

模块 1

def generateFile(self, current_date):
try:
print "*** %s ***" % current_date.strftime("%Y-%m-%d")
weekday=current_date.weekday()
if weekday >= 5:
print "skipping weekend"
return
self.taqdb = taqDB(self.index, self.offset)
cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5"))
outputFile = config.hdf5.filePath(self.index, date=current_date)
print "cache file: ", cache_filename
print "output file: ", outputFile

tempdir = "C:\\rfadump\\"+self.region+"\\"
input_filename = tempdir + filename
print "source file extracted at %s " % input_filename

## universe
reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj) ## PARSER
count = 0
self.rawfile = open(input_filename, 'r')
lines = self.rawfile.read().split('|\n')
total_lines = len(lines)
self.rawfile.close()
del self.rawfile
print "parsing started"
start_time = dt.datetime.now()
print "current time: %s" % start_time
#while(len(lines) > 0):
while(count < total_lines):
#line = lines.pop(0) ## This slows down processing
result = reader.parseline(lines[count]+"|")
count += 1
if(count % 500000 == 0):
print "%d lines parsed" %(count)
if(result == None):
continue
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result
if(len(levelsUpdated) == 0 and tradeupdate == False):
continue
self.taqdb.insert(result)

## write to hdf5 TODO
writer = h5Writer.h5Writer(cache_filename, self.tickobj)
writer.write(self.taqdb.groups)
writer.close()

del lines
del self.taqdb, self.tickobj
##########################################################
print "parsing done."
end_time = dt.datetime.now()
print "end time is %s" % end_time
print "total time elapsed %s" % (end_time - start_time)

defragger = hdf.HDF5Defragmenter()
defragger.Defrag(cache_filename,outputFile)
del defragger
print "done"
gc.collect(2)
except:
print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d")
tb = traceback.format_exc()
print tb

模块 2 - taqdb - 将解析后的数据存储在数组中

class taqDB:
def __init__(self, index, offset):
self.index = index
self.tickcfg = config.hdf5.getTickConfig(index)
self.offset = offset
self.groups = {}

def getGroup(self,ric):
if (self.groups.has_key(ric) == False):
self.groups[ric] = {}
return self.groups[ric]

def getOrderbookArray(self, ric, group):
datasetname = orderBookName
prodtype = self.tickcfg.getProdType(ric)
if(prodtype == ProdType.INDEX):
return
orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype)
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
orderbookArray = self.tickcfg.getOrderBookArray(prodtype)
return orderbookArray
else:
orderbookArray = group[datasetname]
if(len(orderbookArray) == 0):
return self.tickcfg.getOrderBookArray(prodtype)
lastOrderbook = orderbookArray[-orderbookArrayShape[1]:]
return np.array([lastOrderbook])

def addToDataset(self, group, datasetname, timestamp, arr):
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
arr[0,0]=timestamp
a1 = group[datasetname]
a1.extend(arr[0])

def addToOrderBook(self, group, timestamp, arr):
self.addToDataset(self, group, orderBookName, timestamp, arr)

def insert(self, data):
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data
delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000))
timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
## write to array
group = self.getGroup(ric)

orderbookUpdate = False
orderbookArray = self.getOrderbookArray(ric, group)
nonzero = quotes.nonzero()
orderbookArray[nonzero] = quotes[nonzero]
if(np.any(nonzero)):
self.addToDataset(group, orderBookName, timestamp, orderbookArray)
if(tradeupdate == True):
self.addToDataset(group, tradeName, timestamp, trades)

模块 3 - 解析器

class rfaTextToTAQ:
"""RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs."""
def __init__(self,tickconfig):
self.tickconfig = tickconfig
self.token = ''
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.quotes = np.array([]) # read from tickconfig
self.trades = np.array([]) # read from tickconfig
self.prodtype = ProdType.STOCK
self.allquotes = {}
self.alltrades = {}
self.acvol = 0
self.levelsUpdated = []
self.quoteUpdate = False
self.tradeUpdate = False
self.depth = 0

def updateLevel(self, index):
if(self.levelsUpdated.__contains__(index) == False):
self.levelsUpdated.append(index)

def updateQuote(self, fidindex, field):
self.value = float(self.value)
if(self.depth == 1):
index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1))
self.quotes[index[0]][fidindex[1][0]] = self.value
self.updateLevel(index[0])
else:
self.quotes[fidindex] = self.value
self.updateLevel(fidindex[0][0])
self.quoteUpdate = True

def updateTrade(self, fidindex, field):
#self.value = float(self.value)
if(self.tickconfig.tradeUpdate(self.depth) == False):
return
newacvol = float(self.value)
if(field == acvol):
if(self.value > self.acvol):
tradesize = newacvol - self.acvol
self.acvol = newacvol
self.trades[fidindex] = tradesize
if(self.trades.__contains__(0) == False):
self.tradeUpdate = True
else:
self.trades[fidindex] = self.value
if(not (self.trades[0,1]==0 or self.trades[0,2]==0)):
self.tradeUpdate = True

def updateResult(self):
field = ''
valid, field = field_dict.FIDToField(int(self.fid), field)
if(valid == False):
return
if(self.value == '0'):
return
if(self.prodtype == ProdType.STOCK):
fidindex = np.where(self.tickconfig.stkQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.stkTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
else:
fidindex = np.where(self.tickconfig.futQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.futTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)

def getOrderBookTrade(self):
if (self.allquotes.has_key(self.ric) == False):
acvol = 0
self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype)
trades = self.tickconfig.getTradesArray()
self.alltrades[self.ric] = [trades, acvol]
return self.allquotes[self.ric], self.alltrades[self.ric]

def parseline(self, line):
self.tradeUpdate = False
self.levelsUpdated = []
pos = 0
length = len(line)
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.token = ''
ch = ''
while(pos < length):
prevChar = ch
ch = line[pos]
pos += 1
#SEQ_NUM
if(self.state == ReadState.SEQ_NUM):
if(ch != ','):
self.token += ch
else:
self.seq_num = int(self.token)
self.state = ReadState.TIMESTAMP
self.token = ''
# TIMESTAMP
elif(self.state == ReadState.TIMESTAMP):
if(ch == ' '):
self.token = ''
elif(ch != ','):
self.token += ch
else:
if(len(self.token) != 12):
print "Invalid timestamp format. %s. skipping line.\n", self.token
self.state = ReadState.SKIPLINE
else:
self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f')
self.state = ReadState.RIC
self.token = ''
# RIC
elif(self.state == ReadState.RIC):
if(ch != ','):
self.token += ch
else:
self.ric = self.token
self.token = ''
self.ric, self.depth = self.tickconfig.replaceRic(self.ric)
self.prodtype = self.tickconfig.getProdType(self.ric)
if(self.tickconfig.subscribed(self.ric)):
self.state = ReadState.UPDATE_TYPE
self.quotes, trades = self.getOrderBookTrade()
self.trades = trades[0]
self.acvol = trades[1]
else:
self.state = ReadState.SKIPLINE
# UPDATE_TYPE
elif(self.state == ReadState.UPDATE_TYPE):
if(ch != '|'):
self.token += ch
else:
self.update_type = self.token
self.token = ''
self.state = ReadState.FVPAIRS
#SKIPLINE
elif(self.state == ReadState.SKIPLINE):
return None
# FV PAIRS
elif(self.state == ReadState.FVPAIRS):
# FID
if(self.fvstate == fvstate.FID):
if(ch != ','):
if(ch.isdigit() == False):
self.token = self.value+ch
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
else:
self.token += ch
else:
self.fid = self.token
self.token = ''
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
# FIDVALUE
elif(self.fvstate == fvstate.FIDVALUE):
if(ch != '|'):
self.token += ch
else:
self.value = self.token
self.token = ''
self.state = ReadState.FVPAIRS
self.fvstate = fvstate.FID
# TODO set value
self.updateResult()
return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate

谢谢。

最佳答案

The only reliable way to free memory is to terminate the process.

所以,如果你的主程序spawns a worker process完成大部分工作(一天内完成的工作)然后当该工作进程完成时,将释放使用的内存:

import multiprocessing as mp

def work(date):
# Do most of the memory-intensive work here
...

while single_date <= self.end_date:
proc = mp.Process(target = work, args = (single_date,))
proc.start()
proc.join()

关于python - Python 垃圾收集器有问题吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12452017/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com