gpt4 book ai didi

单独的数据库类中的 MySQL 连接池 - 如何?

转载 作者:行者123 更新时间:2023-11-29 10:45:49 24 4
gpt4 key购买 nike

我正在编写一个应用程序,其中我已将所有 MySQL 连接设置和拆卸移至一个类,并使用 With 语句在各个函数调用中进行初始化。

现在开发已全部完成,我正在优化并希望设置连接池 - 但我无法弄清楚如何设置 - 如果我在设置对象时初始化连接池回车,这样不是会为每个对象建立一个新的池吗?

如果我将池设置放在模块的全局中,那么如何确保在开始创建数据库对象之前设置池?

我的数据库代码看起来有点像这样:

# Setting up details for connecting to a local MariaDB/MySQL instance
# replace with suitable code/module when porting to cloud/production
import sys
import mysql.connector
"""Module for abstracting database connectivity
Import this module and then call run_query(), run_query_vals() or run_query_no_return() """

__all__ = ['UseDatabase', 'CredentialsError', 'ConnectionError', 'SQLError']

class ConnectionError(Exception):
pass


class CredentialsError(Exception):
pass

class SQLError(Exception):
pass

dbconfig = { 'host': '127.0.0.1', 'user' : 'statdev', 'password' : 'statdev', 'database': 'stat',}
# Just so we remember. This also doubles as default server details while doing unit testing.

class UseDatabase:
# myconfig = {'host': '127.0.0.1', 'user': 'statdev', 'password': 'statdev', 'database': 'stat', }
config = None

def __init__(self, config: dict):
self.config = config

def __enter__(self) -> 'self':
try:
self.conn = mysql.connector.connect(**self.config)
self.cursor = self.conn.cursor(dictionary=True)
return self
except mysql.connector.InterfaceError as err:
print('Can\'t connect to Database - is it available? \nError: ', str(err))
raise ConnectionError(err)
except mysql.connector.ProgrammingError as err:
print('Invalid credentials - please check ID/Password. \nError: ', str(err))
raise CredentialsError(err)
except mysql.connector.IntegrityError as err:
print("Error: {}".format(err))

except Exception as err:
print('Something else went wrong:', str(err))
return err


def __exit__(self, exc_type, exc_value, exc_traceback):
self.conn.commit()
self.cursor.close()
self.conn.close()
if exc_type is mysql.connector.errors.ProgrammingError:
print('Error in SQL Code - please check the query. \nError: ', str(exc_type))
raise SQLError(exc_value)
elif exc_type:
print('Something else went wrong\n', str(exc_type))
raise exc_type(exc_value)

def run_query(self,query_str) -> 'cursor':
"""query function that takes """
self.cursor.execute(query_str, None)
return self.cursor

def run_query_vals(self, query_str, tupleval) -> 'cursor':
# print("\n\n %s " % query_str)
self.cursor.execute(query_str, tupleval)
return self.cursor

def run_query_no_return(self,query_str) -> 'cursor':
"""query function that takes """
self.cursor.execute(query_str)
return self.cursor

def test():
# dbconfig = {'host': '127.0.0.1', 'user': 'statdev', 'password': 'statdev', 'database': 'stat', }
with UseDatabase(dbconfig) as db:
# result = db.run_query("Select NULL from dual")
result = db.run_query_vals('Select NULL from dual', None)
res = result.fetchone()
if res == {'NULL': None}:
print("DB Module Test was successful! \n"
"Queries return values in dictionaries."
"\nTest query \'Select NULL from dual\' returned result: %s" % str(res))


if __name__ == '__main__':
test()

最佳答案

这对我有用,但我不确定这是一个完美的解决方案,例如,尝试通过 for 循环进行多次插入会导致“获取连接失败;”池已耗尽”错误。当我使用基于函数(非基于类)的连接池时,我没有遇到这个问题。无论如何,为了避免这个问题,我只是一次性使用“cursor.executemany”。

希望这对某人有帮助!

from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector.errors import ProgrammingError, InterfaceError
from settings import config


# Database connection pool
dbconfig = config.dbconfig
dbconfig_pool = config.dbconfig_pool

#The following is my 'class DBasePool' content:
def __init__(self, dbconfig, dbconfig_pool):
self.dbconfig = dbconfig
self.pool_name = dbconfig_pool['pool_name']
self.pool_size = dbconfig_pool['pool_size']
try:
self.cnxpool = self.create_pool(pool_name=self.pool_name, pool_size=self.pool_size)
self.cnx = self.cnxpool.get_connection()
self.cursor = self.cnx.cursor(buffered=True)

except InterfaceError as e:
logger.error(e)
raise ConnectionError(e)
except ProgrammingError as e:
logger.error(e)
raise CredentialsError(e)
except Exception as e:
logger.error(e)
raise

def create_pool(self, pool_name, pool_size):
return MySQLConnectionPool(pool_name=pool_name, pool_size= pool_size, **self.dbconfig)

def close(self, cnx, cursor):
cursor.close()
cnx.close()

def execute(self, sql, data=None):
# Get connection form connection pool instead of creating one
cnx = self.cnxpool.get_connection()
cursor = cnx.cursor(buffered=True)

cursor.execute(sql, data)

if cursor.rowcount:
cnx.commit()
rowcount = cursor.rowcount
self.close(cnx, cursor)
return rowcount
else:
print('Could not insert record(s): {}, {}'.format(sql, data))
return 0

关于单独的数据库类中的 MySQL 连接池 - 如何?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44595563/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com