gpt4 book ai didi

python - 从大型 CSV 文件中快速提取行 block

转载 作者:太空狗 更新时间:2023-10-29 23:57:37 25 4
gpt4 key购买 nike

我有一个很大的 CSV 文件,其中包含格式如下的股票相关数据:

Ticker Symbol, Date, [some variables...]

所以每一行都以符号(如“AMZN”)开头,然后是日期,然后是与所选日期的价格或交易量相关的 12 个变量。此文件中代表了大约 10,000 种不同的证券,我每天都有一行表示每种证券的股票公开交易。该文件首先按股票代码字母顺序排列,然后按日期时间顺序排列。整个文件大约 3.3 GB。

我想要解决的任务是能够提取给定股票代码相对于当前日期的最新 n 行数据。我有执行此操作的代码,但根据我的观察,每次检索似乎平均需要大约 8-10 秒(所有测试都提取了 100 行)。

我有一些我想运行的函数需要我为成百上千个符号抓取这样的 block ,我真的很想减少时间。我的代码效率低下,但我不确定如何让它运行得更快。

首先,我有一个名为 getData 的函数:

def getData(symbol, filename):
out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
"Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
l = len(symbol)
beforeMatch = True
with open(filename, 'r') as f:
for line in f:
match = checkMatch(symbol, l, line)
if beforeMatch and match:
beforeMatch = False
out.append(formatLineData(line[:-1].split(",")))
elif not beforeMatch and match:
out.append(formatLineData(line[:-1].split(",")))
elif not beforeMatch and not match:
break
return out

(此代码有几个辅助函数,checkMatch 和 formatLineData,我将在下面展示它们。)然后,还有另一个名为 getDataColumn 的函数,它获取我想要的列并显示正确的天数:

def getDataColumn(symbol, col=12, numDays=100, changeRateTransform=False):
dataset = getData(symbol)
if not changeRateTransform:
column = [day[col] for day in dataset[-numDays:]]
else:
n = len(dataset)
column = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
return column

(如果为 True,changeRateTransform 会将原始数字转换为每日变化率数字。)辅助函数:

def checkMatch(symbol, symbolLength, line):
out = False
if line[:symbolLength+1] == symbol + ",":
out = True
return out

def formatLineData(lineData):
out = [lineData[0]]
out.append(datetime.strptime(lineData[1], '%Y-%m-%d').date())
out += [float(d) for d in lineData[2:6]]
out += [int(float(d)) for d in lineData[6:9]]
out += [float(d) for d in lineData[9:13]]
out.append(int(float(lineData[13])))
return out

有没有人知道我的代码的哪些部分运行缓慢以及我如何才能使其性能更好?如果不加快速度,我无法进行我想进行的那种分析。


编辑:为了回应评论,我对代码进行了一些更改,以便利用 csv 模块中的现有方法:

def getData(symbol, database):
out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
"Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
l = len(symbol)
beforeMatch = True
with open(database, 'r') as f:
databaseReader = csv.reader(f, delimiter=",")
for row in databaseReader:
match = (row[0] == symbol)
if beforeMatch and match:
beforeMatch = False
out.append(formatLineData(row))
elif not beforeMatch and match:
out.append(formatLineData(row))
elif not beforeMatch and not match:
break
return out

def getDataColumn(dataset, col=12, numDays=100, changeRateTransform=False):
if not changeRateTransform:
out = [day[col] for day in dataset[-numDays:]]
else:
n = len(dataset)
out = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
return out

使用 csv.reader 类时性能更差。我测试了两只股票,AMZN(靠近文件顶部)和 ZNGA(靠近文件底部)。使用原始方法,运行时间分别为 0.99 秒和 18.37 秒。使用利用 csv 模块的新方法,运行时间分别为 3.04 秒和 64.94 秒。两者都返回正确的结果。

我的想法是,查找股票所花费的时间多于解析所花费的时间。如果我在文件 A 中的第一只股票上尝试这些方法,这些方法都将在大约 0.12 秒内运行。

最佳答案

当您要对同一个数据集进行大量分析时,实用的方法是将其全部读入数据库。专为快速查询而生; CSV 不是。例如使用sqlite命令行工具,可以直接从CSV导入。然后在 (Symbol, Date) 上添加一个索引,查找几乎是即时的。

如果出于某种原因这是不可行的,例如因为新文件随时可能进来,而您在开始分析它们之前没有足够的准备时间,您将不得不充分利用直接处理 CSV ,这是我其余答案的重点。但请记住,这是一种平衡行为。您要么预先支付很多费用,要么为每次查找支付额外费用。最终,对于一定数量的查询来说,预付费用会更便宜。

优化就是将未完成的工作量最大化。在这种情况下,使用生成器和内置的 csv 模块不会有太大帮助。您仍然会读取整个文件并解析所有文件,至少对于换行符。有了这么多数据,这是不可能的。

解析需要阅读,因此您必须先找到解决方法。将 CSV 格式的所有复杂性留给专用模块的最佳实践在无法为您提供所需性能时毫无意义。有些作弊是必须要做的,但越少越好。在这种情况下,我想可以安全地假设新行的开头可以标识为 b'\n"AMZN",' (坚持你的例子)。是的,这里是二进制,因为记住:还没有解析。您可以从头扫描文件为二进制文件,直到找到第一行。从那里读取你需要的行数,以正确的方式解码和解析它们,等等。不需要在那里进行优化,因为与你没有这样做的数十万行不相关的行相比,100 行没什么好担心的工作。

放弃所有解析会给你带来很多好处,但阅读也需要优化。不要先将整个文件加载到内存中,并尽可能跳过 Python 层。使用 mmap 让操作系统决定将什么透明地加载到内存中,并让您直接处理数据。

如果符号接近末尾,您仍然有可能读取整个文件。这是一个线性搜索,这意味着它花费的时间与文件中的行数成线性比例。不过你可以做得更好。由于文件已排序,您可以改进函数以执行一种二进制搜索。将采取的步骤数(一个步骤正在读取一行)接近于行数的二进制对数。换句话说:您可以将文件分成两个(几乎)大小相等的部分的次数。当有 100 万行时,这是五个数量级的差异!

这是我根据 Python 自己的 bisect_left 想出的,并采取了一些措施来说明您的“值”跨越多个索引这一事实:

import csv
from itertools import islice
import mmap

def iter_symbol_lines(f, symbol):
# How to recognize the start of a line of interest
ident = b'"' + symbol.encode() + b'",'
# The memory-mapped file
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
# Skip the header
mm.readline()
# The inclusive lower bound of the byte range we're still interested in
lo = mm.tell()
# The exclusive upper bound of the byte range we're still interested in
hi = mm.size()
# As long as the range isn't empty
while lo < hi:
# Find the position of the beginning of a line near the middle of the range
mid = mm.rfind(b'\n', 0, (lo+hi)//2) + 1
# Go to that position
mm.seek(mid)
# Is it a line that comes before lines we're interested in?
if mm.readline() < ident:
# If so, ignore everything up to right after this line
lo = mm.tell()
else:
# Otherwise, ignore everything from right before this line
hi = mid
# We found where the first line of interest would be expected; go there
mm.seek(lo)
while True:
line = mm.readline()
if not line.startswith(ident):
break
yield line.decode()

with open(filename) as f:
r = csv.reader(islice(iter_symbol_lines(f, 'AMZN'), 10))
for line in r:
print(line)

对此代码不作任何保证;我没有太注意边缘情况,我无法测试你的(任何)文件,所以把它当作一个概念证明。然而,它的速度非常快——想想 SSD 上的几十毫秒吧!

关于python - 从大型 CSV 文件中快速提取行 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41202413/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com