gpt4 book ai didi

python - 在 python 中查找大字符串中所有出现的子字符串的最快方法是什么

转载 作者:太空宇宙 更新时间:2023-11-03 14:10:07 24 4
gpt4 key购买 nike

编辑以阐明输入/输出。我认为无论如何这都会有点慢,但直到现在我还没有真正考虑过我的 python 脚本的速度,我正试图找出加快这些操作的方法。

我的输入是基因组序列的腌制字典。我目前正在研究两个基因组,即芽殖酵母基因组(磁盘上 11.5 MB)和人类基因组(磁盘上 2.8 GB)。这些词典具有以下形式:

seq_d = { 'chr1' : 'ATCGCTCGCTGCTCGCT', 'chr2' : 'CGATCAGTCATGCATGCAT', 
'chr3' : 'ACTCATCATCATCATACTGGC' }

我想在基因组的两条链中找到核苷酸的所有单碱基实例。其中+链是指上述字典中的序列,-链是序列的反向补码。我的输出是一个嵌套字典,其中顶级 keys+-,嵌套 keys 是染色体名称,值是 0 索引位置的列表:

nts = 'T'
test_d = {'+': {'chr3': [2, 5, 8, 11, 14, 17], 'chr2': [3, 7, 10, 14, 18],
'chr1': [1, 5, 9, 12, 16]}, '-': {'chr3': [0, 4, 7, 10, 13, 15],
'chr2': [2, 5, 9, 13, 17], 'chr1': [0]}}

test_d 定义了一组位置,以便稍后在脚本中的大型 Illumina 测序数据集中进行检查。

我的第一次尝试使用enumerate 和迭代。

import time
import numpy as np

rev_comps = { 'A' : 'T', 'T' : 'A', 'G' : 'C', 'C' : 'G', 'N' : 'N'}
test_d = { '+' : {}, '-' : {}}
nts = 'T'

s = time.time()
for chrom in seq_d:
plus_pos, minus_pos = [], []
chrom_seq = seq_d[chrom]
for pos, nt in enumerate(chrom_seq):
if nt in nts:
plus_pos.append(pos)
if rev_comps[nt] in nts:
minus_pos.append(pos)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos
e = time.time()
print 'The serial version took {} minutes...'.format((e-s)/60)

酵母的输出:

The serial version took 0.0455190300941 minutes...

人类的输出:

The serial version took 10.1694028815 minutes...

我尝试使用 numpy 数组而不是 enumerate() 和迭代:

s = time.time()
for chrom in seq_d:
chrom_seq = np.array(list(seq_d[chrom]))
nts = list(nts)
rev_nts = [rev_comps[nt] for nt in nts]
plus_pos = list(np.where(np.in1d(chrom_seq, nts) == True)[0])
minus_pos = list(np.where(np.in1d(chrom_seq, rev_nts) == True)[0])

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos
e = time.time()
print 'The numpy version took {} minutes...'.format((e-s)/60)

酵母的输出:

The numpy version took 0.0309354345004 minutes...

人类的输出:

The numpy version took 9.86174853643 minutes...

为什么 numpy 版本会失去其对更大的人类基因组的性能优势?有没有更快的方法来做到这一点?我尝试使用 multiprocessing.Pool 实现一个版本,但这比任何一个版本都慢:

def getTestHelper(chrom_seq, nts, rev_comp):

rev_comps = { 'A' : 'T', 'T' : 'A', 'G' : 'C', 'C' : 'G', 'N' : 'N'}
if rev_comp:
nts = [rev_comps[nt] for nt in nts]
else:
nts = list(nts)
chrom_seq = np.array(list(chrom_seq))
mask = np.in1d(chrom_seq, nts)
pos_l = list(np.where(mask == True)[0])
return pos_l

s = time.time()
pool = Pool(4)
plus_pos = pool.map(functools.partial(getTestHelper, nts=nts, rev_comp=False), seq_d.values())
minus_pos = pool.map(functools.partial(getTestHelper, nts=nts, rev_comp=True), seq_d.values())
e = time.time()
print 'The parallel version took {} minutes...'.format((e-s)/60)

我还没有在人类基因组上运行过这个,但是酵母版本比较慢:

The parallel version took 0.652778700987 minutes...

最佳答案

老实说,我认为您一直在做正确的事情。

不过,您还可以对代码进行一些调整。一般来说,当性能是关键时,只在最内层的循环中做最少的事情。查看您的代码,在这方面仍有一些快速优化:

  1. 使用 if...elif 而不是 if...if。
  2. 不要在不必要的地方使用列表 - 例如对于 nts 和反之,只需一个字符串就足够了。
  3. 不要多次评估相同的结果 - 例如反向查找。

我猜你的多处理问题归结于这些非常大的字符串的序列化,抵消了你可能从并行运行中获得的任何性能提升。但是,可能还有另一种方法可以做到这一点 - 请参阅 Parallelizing a Numpy vector operation .我无法验证,因为我在安装 numexpr 时遇到困难。

将它们放在一起并尝试此路径中的其他一些建议,我得到以下结果:

$ python test.py
Original serial took 0.08330821593602498 minutes...
Using sets took 0.09072601397832235 minutes...
Using built-ins took 0.061421032746632895 minutes...
Using regex took 0.11649663050969442 minutes...
Optimized serial took 0.05909080108006795 minutes...
Original numpy took 0.04050511916478475 minutes...
Optimized numpy took 0.03438538312911987 minutes...

我的代码如下。

import time
import numpy as np
from random import choice
import re

# Create single large chromosome for the test...
seq = ""
for i in range(10000000):
seq += choice("ATGCN")
seq_d = {"Chromosome1": seq}


rev_comps = { 'A' : 'T', 'T' : 'A', 'G' : 'C', 'C' : 'G', 'N' : 'N'}
nts = 'T'

# Original serial implementation
def serial():
test_d = { '+' : {}, '-' : {}}
for chrom in seq_d:
plus_pos, minus_pos = [], []
chrom_seq = seq_d[chrom]
for pos, nt in enumerate(chrom_seq):
if nt in nts:
plus_pos.append(pos)
if rev_comps[nt] in nts:
minus_pos.append(pos)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Optimized for single character tests
def serial2():
test_d = {'+': {}, '-': {}}
rev_nts = rev_comps[nts]
for chrom in seq_d:
plus_pos, minus_pos = [], []
chrom_seq = seq_d[chrom]
for pos, nt in enumerate(chrom_seq):
if nt == nts:
plus_pos.append(pos)
elif nt == rev_nts:
minus_pos.append(pos)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Use sets instead of lists
def set_style():
test_d = { '+' : {}, '-' : {}}
for chrom in seq_d:
plus_pos, minus_pos = set(), set()
chrom_seq = seq_d[chrom]
for pos, nt in enumerate(chrom_seq):
if nt in nts:
plus_pos.add(pos)
if rev_comps[nt] in nts:
minus_pos.add(pos)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Use regex to find either nucleotide...
def regex_it():
test_d = { '+' : {}, '-' : {}}
search = re.compile("(T|A)")
for chrom in seq_d:
pos = 0
plus_pos, minus_pos = [], []
chrom_seq = seq_d[chrom]
for sub_seq in search.split(chrom_seq):
if len(sub_seq) == 0:
continue
if sub_seq[0] == 'T':
plus_pos.append(pos)
elif sub_seq[0] == 'A':
minus_pos.append(pos)
pos += len(sub_seq)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Use str.find instead of iteration
def use_builtins():
test_d = { '+' : {}, '-' : {}}
for chrom in seq_d:
plus_pos, minus_pos = [], []
chrom_seq = seq_d[chrom]
start = 0
while True:
pos = chrom_seq.find("T", start)
if pos == -1:
break
plus_pos.append(pos)
start = pos + 1

start = 0
while True:
pos = chrom_seq.find("A", start)
if pos == -1:
break
minus_pos.append(pos)
start = pos + 1

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Original numpy implementation
def numpy1():
test_d = { '+' : {}, '-' : {}}
for chrom in seq_d:
chrom_seq = np.array(list(seq_d[chrom]))
for_nts = list(nts)
rev_nts = [rev_comps[nt] for nt in nts]
plus_pos = list(np.where(np.in1d(chrom_seq, for_nts) == True)[0])
minus_pos = list(np.where(np.in1d(chrom_seq, rev_nts) == True)[0])

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

# Optimized for single character look-ups
def numpy2():
test_d = {'+': {}, '-': {}}
rev_nts = rev_comps[nts]
for chrom in seq_d:
chrom_seq = np.array(list(seq_d[chrom]))
plus_pos = np.where(chrom_seq == nts)
minus_pos = np.where(chrom_seq == rev_nts)

test_d['+'][chrom] = plus_pos
test_d['-'][chrom] = minus_pos

for fn, name in [
(serial, "Original serial"),
(set_style, "Using sets"),
(use_builtins, "Using built-ins"),
(regex_it, "Using regex"),
(serial2, "Optimized serial"),
(numpy1, "Original numpy"),
(numpy2, "Optimized numpy")]:
s = time.time()
fn()
e = time.time()
print('{} took {} minutes...'.format(name, (e-s)/60))

关于python - 在 python 中查找大字符串中所有出现的子字符串的最快方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39310956/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com