gpt4 book ai didi

python - 如何有效地将 tsv 文件中的数组列读取到每个列的单个 npz 文件中?

转载 作者:行者123 更新时间:2023-12-05 00:51:01 27 4
gpt4 key购买 nike

我有一个如下所示的数据文件:

58f0965a62d62099f5c0771d35dbc218        0.868632614612579       [0.028979932889342308, 0.004080114420503378, 0.03757167607545853]       [-0.006008833646774292, -0.010409083217382431, 0.01565541699528694]
36f7859ce47417470bc28384694f0ac4 0.835115909576416 [0.026130573824048042, -0.00358427781611681, 0.06635218113660812] [-0.06970945745706558, 0.03816794604063034, 0.03491008281707764]
59f7d617bb662155b0d49ce3f27093ed 0.907200276851654 [0.009903069585561752, -0.009721670299768448, 0.0151780480518937] [-0.03264783322811127, 0.0035394825972616673, -0.05089104175567627]

列分别在哪里

  • 数据点的 md5 哈希
  • 目标浮点输出
  • 我想读入 np.array 对象的 float 组
  • 我想读入 np.array 对象的另一个 float 组

我一直在读取文件,以便为 float 组的两个矩阵创建一个 numpy 数组文件:

import numpy as np
from tqdm import tqdm

import pandas as pd

lol = []
with open('data.tsv') as fin:
for line in tqdm(fin):
md5hash, score, vector1, vector2 = line.strip().split('\t')
row = {'md5_hash': md5hash, 'score':float(score),
'vector1': np.array(eval(vector1)),
'vector2': np.array(eval(vector2))
}
lol.append(row)

df = pd.DataFrame(lol)

training_vector1 = np.array(list(df['vector1']))
# Save the training vectors.
np.save('vector1.npz', training_vector1)

training_vector2 = np.array(list(df['vector2']))
# Save the training vectors.
np.save('vector1.npz', training_vector2)

虽然这适用于小型数据集,但实际数据集的数组中有更多 float ,接近 2 亿行。这是 100 行的示例 https://gist.github.com/1f6f0b2501dc334db1e0038d36452f5d

如何高效地将tsv文件中的数组列读入单个npz文件中高效的每一列?

最佳答案

首先,关于整体问题的说明。任何加载 200M 行的方法与您提供的示例输入类似,都需要大约 1.1 TB 的内存。虽然这是可能的,但它肯定不是理想的。因此,我不建议继续这样做,而是研究专门为处理大型数据集而设计的方法,例如HDF5 .

话虽如此,手头的问题并不是特别复杂,而是通过pandaseval()可能既不理想也不有益。

cut 也是如此。预处理成 CSV 文件,阅读起来稍微简单一些。

假设 np.save()将同样快,无论数组是如何生成的,我们可以说以下函数很好地复制了 OP 中的处理:

def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):  
lol = []
with open(filepath, "r") as fin:
for line in fin:
md5hash, score, vector1, vector2 = line.strip().split('\t')
row = {'md5_hash': md5hash, 'score':float(score),
'vector1': np.array(eval(vector1)),
'vector2': np.array(eval(vector2))
}
lol.append(row)
df = pd.DataFrame(lol)
training_vector1 = np.array(list(df['vector1']))
training_vector2 = np.array(list(df['vector2']))
return training_vector1, training_vector2

这可以通过避免 pandas 来简化。和“evil-eval()”(以及内存中的一些复制):

def text2row(text):
text = text[1:-1]
return [float(x) for x in text.split(',')]


def process_tsv(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2

很容易证明两者产生相同的输出:

def same_res(x, y):
return all(np.allclose(i, j) for i, j in zip(x, y))


same_res(process_tsv(), process_tsv_OP())
# True

但时间安排大不相同:

%timeit process_tsv_OP()
# 1 loop, best of 5: 300 ms per loop
%timeit process_tsv()
# 10 loops, best of 5: 86.1 ms per loop

(在通过 wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv 获得的示例输入文件上)


使用 cut 预处理输入似乎没有那么有益:

!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
# real 0m0.184s
# user 0m0.102s
# sys 0m0.233s
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
# real 0m0.208s
# user 0m0.113s
# sys 0m0.279s
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
# 1 loop, best of 5: 130 ms per loop

虽然使用 pd.read_csv() 可能会节省一些时间:

%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop

这似乎比在提供的数据集上的原始方法还要慢(尽管 cut 本身对于更大的输入可能会更好地扩展)。


如果您真的想坚持使用 npy为此文件格式,您可能至少希望以 block 的形式附加到您的输出中。虽然单独使用 NumPy 不能很好地支持这一点,但您可以使用 NpyAppendArray (另见 here)。修改后的process_tsv()看起来像:

import os
from npy_append_array import NpyAppendArray


def process_tsv_append(
in_filepath="100-translation.embedded-3.tsv",
out1_filepath="out1.npy",
out2_filepath="out2.npy",
append_every=10,
):
# clear output files
for filepath in (out1_filepath, out2_filepath):
if os.path.isfile(filepath):
os.remove(filepath)
with \
open(in_filepath, "r") as in_file, \
NpyAppendArray(out1_filepath) as npaa1, \
NpyAppendArray(out2_filepath) as npaa2:
v1 = []
v2 = []
for i, line in enumerate(in_file, 1):
_, _, text_r1, text_r2 = line.strip().split("\t")
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
if i % append_every == 0:
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
v1 = []
v2 = []
if len(v1) > 0: # assumes len(v1) == len(v2)
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))


process_tsv_append()

v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
# True

所有这一切都可以通过 Cython 相对盲目地加速,但加速似乎是微不足道的:

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True


import numpy as np


cpdef text2row_cy(text):
return [float(x) for x in text[1:-1].split(',')]


cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row_cy(text_r1)
r2 = text2row_cy(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
print(same_res(process_tsv_cy(), process_tsv_OP()))
# True
%timeit process_tsv_cy()
# 10 loops, best of 5: 72.4 ms per loop

同样,预分配数组似乎没有好处:

def text2row_out(text, out):
for i, x in enumerate(text[1:-1].split(',')):
out[i] = float(x)


def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
num_lines = open(filepath, "r").read().count("\n")
with open(filepath, "r") as in_file:
# num lines
num_lines = in_file.read().count("\n")
# num cols
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split('\t')
num_cols1 = len(text_r1.split(","))
num_cols2 = len(text_r2.split(","))
# populate arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
for i, line in enumerate(in_file):
_, _, text_r1, text_r2 = line.strip().split('\t')
text2row_out(text_r1, v1[i])
text2row_out(text_r2, v2[i])
return v1, v2


print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
# 10 loops, best of 5: 110 ms per loop

使用 Numba(也可能使用 Cython)可以通过将所有内容重写为更接近 C 来显着减少运行时间。为了使我们的代码与 - 兼容并有益于通过 - 加速它Numba,我们需要进行重大修改:

  • 以字节形式打开文件(不再支持 UTF-8,这对于手头的问题来说不是一个重要问题)
  • 以 block 为单位读取和处理文件,该文件需要足够大,例如 1M 左右
  • 手动编写所有字符串处理函数,尤其是字符串到 float 的转换
import numpy as np
import numba as nb


@nb.njit
def bytes2int(text):
c_min = ord("0")
c_max = ord("9")

n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
else:
valid = False
break
return sign * number if valid else None


@nb.njit
def bytes2float_helper(text):
sep = ord(".")
c_min = ord("0")
c_max = ord("9")

n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
sep_pos = 0
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
elif c == sep and sep_pos == 0:
sep_pos = j
else:
valid = False
break
return sign * number, sep_pos, valid


@nb.njit
def bytes2float(text):
exp_chars = b"eE"
exp_pos = -1
for exp_char in exp_chars:
for i, c in enumerate(text[::-1]):
if c == exp_char:
exp_pos = i
break
if exp_pos > -1:
break
if exp_pos > 0:
exp_number = bytes2int(text[-exp_pos:])
if exp_number is None:
exp_number = 0
number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
result = number / 10.0 ** (sep_pos - exp_number) if valid else None
else:
number, sep_pos, valid = bytes2float_helper(text)
result = number / 10.0 ** sep_pos if valid else None
return result


@nb.njit
def btrim(text):
space = ord(" ")
tab = ord("\t")
nl = ord("\n")
cr = ord("\r")
start = 0
stop = 0
for c in text:
if c == space or c == tab or c == nl or c == cr:
start += 1
else:
break
for c in text[::-1]:
if c == space:
stop += 1
else:
break
if start == 0 and stop == 0:
return text
elif stop == 0:
return text[start:]
else:
return text[start:-stop]


@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
last_i = 0
j = 0
for i, c in enumerate(text):
if c == sep:
x = bytes2float(btrim(text[last_i:i]))
out[curr_row, j] = x
last_i = i + 2
j += 1
x = bytes2float(btrim(text[last_i:]))
out[curr_row, j] = x


@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
if len(line) > 0:
psep_pos = np.empty(num_psep, dtype=np.int_)
j = 0
for i, char in enumerate(line):
if char == psep:
psep_pos[j] = i
j += 1
text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)


@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
nl = ord("\n")
last_i = 0
i = j = 0
for c in block:
if c == nl:
process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
j += 1
last_i = i
curr_row += 1
if j >= num_lines:
break
i += 1
return block[i + 1:], curr_row


@nb.njit
def count_nl(block, start=0):
nl = ord("\n")
for c in block:
if c == nl:
start += 1
return start


def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
with open(filepath, "rb") as in_file:
# count newlines
num_lines = 0
while True:
block = in_file.read(size)
if block:
num_lines = count_nl(block, num_lines)
else:
break

# count num columns
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split(b'\t')
num_cols1 = len(text_r1.split(b","))
num_cols2 = len(text_r2.split(b","))

# fill output arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
remainder = b""
curr_row = 0
while True:
block = in_file.read(size)
if block:
block = remainder + block
num_lines = count_nl(block)
if num_lines > 0:
remainder, curr_row = decode_block(block, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
else:
remainder = block
else:
num_lines = count_nl(remainder)
if num_lines > 0:
remainder, curr_row = decode_block(remainder, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
break
return v1, v2

所有这些工作的奖励只是比 process_tsv() 快约 2 倍:

print(same_res(process_tsv_block(), process_tsv_OP()))
# True
%timeit process_tsv_block()
# 10 loops, best of 5: 48.8 ms per loop

关于python - 如何有效地将 tsv 文件中的数组列读取到每个列的单个 npz 文件中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73147276/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com