gpt4 book ai didi

python - VCF 解析和插入数据库速度慢

转载 作者:太空宇宙 更新时间:2023-11-03 19:47:22 26 4
gpt4 key购买 nike

我的数据文件是gnomad.exomes.r2.1.1.sites.13.vcf.bgz .

我已经解压了VCF。这是我在过滤数据后插入 Postgres 的代码:

# =============================================================================
# import libraries for python
# =============================================================================
import vcf
import os.path
from sqlalchemy import create_engine
import psycopg2
import datetime
import sys
import _thread
import csv

from io import StringIO
import pandas as pd

db_user_name = "postgres";db_password = "password";db_name = "vcd";db_host_name = "localhost"
url = 'postgres://'+db_user_name+':'+db_password+'@'+db_host_name+'/'+db_name
engine = create_engine(url)

# =============================================================================
# function drops the previous table if exists and creates new table with dynamic table name
# =============================================================================
def create_table(table_name,table_type,conn_obj):
dbcur = conn_obj.cursor()
dbcur.execute("""DROP TABLE IF EXISTS {0};""".format(table_name))
if table_type == '1' :
dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_ID INT, as_NM TEXT, as_at_ID INT, as_at_NM TEXT, VCF_ID TEXT, VARIANT_ID TEXT, as_at_LINE_SEQ INT, DATE_START DATE, DATE_END DATE, as_at_VAL_SEQ INT, as_at_VA_NM TEXT, as_at_VALUE TEXT);""".format(table_name))
elif table_type == '2' :
dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_at_VA_NM TEXT, as_at_VA_DESC TEXT,as_at_DATA_TYPE TEXT);""".format(table_name))
conn_obj.commit()
message = 'Table '+table_name+' created successfully.\n'
print(message)
return table_name

# =============================================================================
# function for insert vcf files data in values table
# =============================================================================

def create_tuples(as_id, vcf_id, Variant_id, as_at_nm, sample_id, as_at_va_nm, as_at_value, date_start, date_end, as_va_seq, as_at_va_line_seq,data_table_name):
as_at_id = '1'
sample_id = '1'
variant_id = '1'
as_nm = 'VCF'
global datalist
if (as_at_va_nm in headers_arr)==False:
return
#datalist.append("({0},'{1}','{2}','{3}','{4}',{5},'{6}','{7}','{8}','{9}')".format(as_id,str(as_nm),as_at_id,as_at_nm,"",variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value))
datalist.append([as_id,str(as_nm),as_at_id,as_at_nm,vcf_id,variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value])

if len(datalist)==10000:
#insertdata(datalist,data_table_name,as_id)
_thread.start_new_thread( insertdata, (datalist,data_table_name,as_id ) )
datalist=[]
def insertdata(datalist2,data_table_name,as_id):
global execution_start_time
columns = ["as_id","as_nm","as_at_id","as_at_nm","vcf_id","variant_id","as_at_line_seq","as_at_val_seq","as_at_va_nm","as_at_value"]
df = pd.DataFrame(datalist2,columns=columns)
df.to_sql(data_table_name,engine,if_exists='append',method=psql_insert_copy, index=False)
time_now = datetime.datetime.now()
time_difference = time_now - execution_start_time
print('Total inserted records:'+str(datalist2[len(datalist2)-1][6])+' in time '+str(time_difference))

def psql_insert_copy(table, conn, keys, data_iter):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()

writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)

columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name

sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
# =============================================================================
# function defination / switch case for get recoras variable name & variable value
# =============================================================================
def get_DataFrom_Column(header):
switcher = {
"CHROM" : {'variable_name':'CHROM', 'variable_value': record.CHROM},
"POS" : {'variable_name':'POS', 'variable_value': record.POS},
"ID" : {'variable_name':'ID', 'variable_value': record.ID},
"REF" : {'variable_name':'REF', 'variable_value': record.REF},
"ALT" : {'variable_name':'ALT', 'variable_value': record.ALT},
"QUAL" : {'variable_name':'QUAL', 'variable_value': record.QUAL},
"FILTER" : {'variable_name':'FILTER', 'variable_value': record.FILTER},
"INFO" : {'variable_name':'INFO', 'variable_value': record.INFO},
"FORMAT" : {'variable_name':'FORMAT', 'variable_value': record.FORMAT}
}
return switcher.get(header, "Invalid header")
dic={}
def insert_infos_metadata(reader_infos,line_index,file_name,as_at_nm):
for info_data in reader_infos:
va_desc = reader_infos[info_data].desc
arr_len = len(va_desc.split(':'))
if arr_len > 1 :
last_str = va_desc.split(':')[arr_len-1]
dic[info_data]=last_str
return line_index

# =============================================================================
# global variable declaration
# =============================================================================
index = 0
variableList = []
datalist = []
headers_arr = []
totalrecordinserted=0
curentRecord=-1
execution_start_time= datetime.datetime.now() # calculate execution start time

# =============================================================================
# get file path or name from user
# =============================================================================

file_path = "/gnomad.exomes.r2.1.1.sites.13.vcf"
if os.path.isfile(file_path) : #Check file is exists or not on given path
if True:
conn = psycopg2.connect(host="localhost",database="vcd", user="postgres", password="pgAdmin")
cursor = conn.cursor()
data_table_name = "data2"
create_table(data_table_name,"1",conn)
vcf_reader = vcf.Reader(open(file_path, 'r'))
column_headers = vcf_reader._column_headers
line_index = 0
null_date = None
as_at_header = 'Header'
as_at_variant = 'Variant'
reader_infos = vcf_reader.infos
line_index = insert_infos_metadata(reader_infos,line_index,file_path,as_at_header)

execution_start_time= datetime.datetime.now()
headers_arr = ['INFO/AF', 'INFO/AF_NFE_SEU', 'INFO/AF_RAW', 'INFO/AF_FIN_FEMALE', 'INFO/AF_NFE_BGR', 'INFO/AF_SAS_MALE', 'INFO/AF_AFR_MALE', 'INFO/AF_AFR', 'INFO/AF_EAS_FEMALE', 'INFO/AF_AFR_FEMALE', 'INFO/AF_SAS', 'INFO/AF_NFE_ONF', 'INFO/AF_FIN_MALE', 'INFO/AF_NFE_FEMALE', 'INFO/AF_AMR', 'INFO/AF_EAS', 'INFO/AF_ASJ_MALE', 'INFO/AF_OTH_FEMALE', 'INFO/AF_NFE_SWE', 'INFO/AF_NFE_NWE', 'INFO/AF_EAS_JPN', 'INFO/AF_FEMALE', 'INFO/AF_EAS_KOR', 'INFO/AF_EAS_OEA', 'INFO/AF_NFE_EST', 'INFO/AF_EAS_MALE', 'INFO/AF_NFE', 'INFO/AF_FIN', 'INFO/AF_NFE_MALE', 'INFO/AF_SAS_FEMALE', 'INFO/AF_ASJ_FEMALE', 'INFO/AF_ASJ', 'INFO/AF_OTH', 'INFO/AF_MALE', 'INFO/AF_AMR_MALE', 'INFO/AF_AMR_FEMALE', 'INFO/AF_OTH_MALE', 'INFO/AF_POPMAX', 'CHROM', 'POS', 'REF', 'ALT', 'QUAL', 'FILTER']
for record in vcf_reader:
index += 1
line_index += 1
if index == 50000:
break
Variant_id = ''
sample_name = ''
#for header in column_headers :
for header in column_headers :
if header != 'FORMAT' :
header_data_dict = get_DataFrom_Column(header)
#print(header_data_dict)
header_data_dict_value = header_data_dict['variable_value']
#print(header_data_dict_value)
if isinstance(header_data_dict_value, str) :
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value), null_date, null_date, 1, str(line_index),data_table_name)
elif isinstance(header_data_dict_value, list) :
if len(header_data_dict_value) == 0 and header == 'FILTER' :
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, 'PASS', null_date, null_date, 1, str(line_index),data_table_name)
else :
i = 0
while i < len(header_data_dict_value) :
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value[i]), null_date, null_date, str(i), str(line_index),data_table_name)
i += 1
elif isinstance(header_data_dict_value, dict) :
for dict_val_record in header_data_dict_value :
#print(dict_val_record,'.......',header_data_dict_value[dict_val_record])
variable_name = header + '/' + dict_val_record
variable_value = header_data_dict_value[dict_val_record]
#print('.........',variable_value)
variable_seq = 1
if isinstance(variable_value,list) :
for value in variable_value :
if dict_val_record in dic :
header_key_description = vcf_reader.infos[dict_val_record].desc
arr_len = len(header_key_description.split(':'))
last_str = header_key_description.split(':')[arr_len-1]
array_obj_desc_headers = last_str.split('|')
arr_obj_val = value.strip("()[]").split("|")
vararray = dic[dict_val_record].split("|")
arr_index = 0
for desc_header_value in arr_obj_val :
variable_name = header+'/'+dict_val_record+'/'+vararray[arr_index].lstrip().replace("'","")
variable_value = arr_obj_val[arr_index]
arr_index += 1
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
else :
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)

variable_seq += 1
else :
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
else :
variable_name = header
variable_value = str(header_data_dict_value)
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1, str(line_index),data_table_name)
else :
format_data_arr = (record.FORMAT).split(":")
for format_data_record in format_data_arr :
variable_name = header+'/'+format_data_record
variable_value = record.genotype(record_data.sample)[format_data_record]
create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1,str(line_index),data_table_name)

insertdata(datalist,data_table_name,"1")
print('Variants inserted successfully.')
else:
print('Incorrect Choice.')
else:
print('File not present.')

这是输出表:

as_id   as_nm   as_at_id   as_at_nm vcf_id  variant_id  as_at_line_seq  date_start  date_end    as_at_val_seq   as_at_va_nm                         as_at_value
1 VCF 1 Variant "" 1 1 1 CHROM 22
1 VCF 1 Variant "" 1 1 1 POS 16050036
1 VCF 1 Variant "" 1 1 1 ID rs374742143
1 VCF 1 Variant "" 1 1 1 REF A
1 VCF 1 Variant "" 1 1 0 ALT C
1 VCF 1 Variant "" 1 1 1 QUAL 442156.34
1 VCF 1 Variant "" 1 1 0 FILTER RF
1 VCF 1 Variant "" 1 1 1 INFO/non_neuro_nhomalt_amr_male 0
1 VCF 1 Variant "" 1 1 1 INFO/controls_AN_afr 4
1 VCF 1 Variant "" 1 1 1 INFO/controls_AC_amr_female 3
1 VCF 1 Variant "" 1 1 1 INFO/AF_oth_female 0.5
1 VCF 1 Variant "" 1 1 1 INFO/non_topmed_nhomalt_afr_female 0

处理 50,000 条记录大约需要 5 分 43 秒。我不明白为什么要花这么长时间。

有人可以帮我优化这段代码吗?

最佳答案

要加快插入速度,在事务中,您可以调整 work_mem和/或 maintenance_work_mem。另外,这个this answer解释得更深入一点

关于python - VCF 解析和插入数据库速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60041739/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com