- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章python3 实现mysql数据库连接池的示例代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
dbutils封装文件传送门 。
dbutils是一套python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。dbutils来自webware for python.
dbutils提供两种外部接口:
需要库 1、dbutils pip install dbutils 2、pymysql pip install pymysql/mysqldb 创建dbutils组件 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# -*- coding: utf-8 -*-
import
pymysql
# 数据库信息
db_test_host
=
"127.0.0.1"
db_test_port
=
3306
db_test_dbname
=
"ball"
db_test_user
=
"root"
db_test_password
=
"123456"
# 数据库连接编码
db_charset
=
"utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
db_min_cached
=
10
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
db_max_cached
=
10
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
db_max_shared
=
20
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
db_max_connecyions
=
100
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 false 代表返回一个错误<tomany......> 其他代表阻塞直到连接数减少,连接被分配)
db_blocking
=
true
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 false 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
db_max_usage
=
0
# setsession : 一个可选的sql命令列表用于准备每个会话,如["set datestyle to german", ...]
db_set_session
=
none
# creator : 使用连接数据库的模块
db_creator
=
pymysql
|
db_dbutils_init.py 创建数据池初始化 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
from
dbutils.pooleddb
import
pooleddb
import
db_config as config
"""
@功能:创建数据库连接池
"""
class
myconnectionpool(
object
):
__pool
=
none
# def __init__(self):
# self.conn = self.__getconn()
# self.cursor = self.conn.cursor()
# 创建数据库连接conn和游标cursor
def
__enter__(
self
):
self
.conn
=
self
.__getconn()
self
.cursor
=
self
.conn.cursor()
# 创建数据库连接池
def
__getconn(
self
):
if
self
.__pool
is
none:
self
.__pool
=
pooleddb(
creator
=
config.db_creator,
mincached
=
config.db_min_cached,
maxcached
=
config.db_max_cached,
maxshared
=
config.db_max_shared,
maxconnections
=
config.db_max_connecyions,
blocking
=
config.db_blocking,
maxusage
=
config.db_max_usage,
setsession
=
config.db_set_session,
host
=
config.db_test_host,
port
=
config.db_test_port,
user
=
config.db_test_user,
passwd
=
config.db_test_password,
db
=
config.db_test_dbname,
use_unicode
=
false,
charset
=
config.db_charset
)
return
self
.__pool.connection()
# 释放连接池资源
def
__exit__(
self
, exc_type, exc_val, exc_tb):
self
.cursor.close()
self
.conn.close()
# 关闭连接归还给链接池
# def close(self):
# self.cursor.close()
# self.conn.close()
# 从连接池中取出一个连接
def
getconn(
self
):
conn
=
self
.__getconn()
cursor
=
conn.cursor()
return
cursor, conn
# 获取连接池,实例化
def
get_my_connection():
return
myconnectionpool()
|
制作mysqlhelper.py 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
from
db_dbutils_init
import
get_my_connection
"""执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""
class
mysqlhelper(
object
):
def
__init__(
self
):
self
.db
=
get_my_connection()
# 从数据池中获取连接
def
__new__(
cls
,
*
args,
*
*
kwargs):
if
not
hasattr
(
cls
,
'inst'
):
# 单例
cls
.inst
=
super
(mysqlhelper,
cls
).__new__(
cls
,
*
args,
*
*
kwargs)
return
cls
.inst
# 封装执行命令
def
execute(
self
, sql, param
=
none, autoclose
=
false):
"""
【主要判断是否有参数和是否执行完就释放连接】
:param sql: 字符串类型,sql语句
:param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
:param autoclose: 是否关闭连接
:return: 返回连接conn和游标cursor
"""
cursor, conn
=
self
.db.getconn()
# 从连接池获取连接
count
=
0
try
:
# count : 为改变的数据条数
if
param:
count
=
cursor.execute(sql, param)
else
:
count
=
cursor.execute(sql)
conn.commit()
if
autoclose:
self
.close(cursor, conn)
except
exception as e:
pass
return
cursor, conn, count
# 执行多条命令
# def executemany(self, lis):
# """
# :param lis: 是一个列表,里面放的是每个sql的字典'[{"sql":"xxx","param":"xx"}....]'
# :return:
# """
# cursor, conn = self.db.getconn()
# try:
# for order in lis:
# sql = order['sql']
# param = order['param']
# if param:
# cursor.execute(sql, param)
# else:
# cursor.execute(sql)
# conn.commit()
# self.close(cursor, conn)
# return true
# except exception as e:
# print(e)
# conn.rollback()
# self.close(cursor, conn)
# return false
# 释放连接
def
close(
self
, cursor, conn):
"""释放连接归还给连接池"""
cursor.close()
conn.close()
# 查询所有
def
selectall(
self
, sql, param
=
none):
try
:
cursor, conn, count
=
self
.execute(sql, param)
res
=
cursor.fetchall()
return
res
except
exception as e:
print
(e)
self
.close(cursor, conn)
return
count
# 查询单条
def
selectone(
self
, sql, param
=
none):
try
:
cursor, conn, count
=
self
.execute(sql, param)
res
=
cursor.fetchone()
self
.close(cursor, conn)
return
res
except
exception as e:
print
(
"error_msg:"
, e.args)
self
.close(cursor, conn)
return
count
# 增加
def
insertone(
self
, sql, param):
try
:
cursor, conn, count
=
self
.execute(sql, param)
# _id = cursor.lastrowid() # 获取当前插入数据的主键id,该id应该为自动生成为好
conn.commit()
self
.close(cursor, conn)
return
count
# 防止表中没有id返回0
# if _id == 0:
# return true
# return _id
except
exception as e:
print
(e)
conn.rollback()
self
.close(cursor, conn)
return
count
# 增加多行
def
insertmany(
self
, sql, param):
"""
:param sql:
:param param: 必须是元组或列表[(),()]或((),())
:return:
"""
cursor, conn, count
=
self
.db.getconn()
try
:
cursor.executemany(sql, param)
conn.commit()
return
count
except
exception as e:
print
(e)
conn.rollback()
self
.close(cursor, conn)
return
count
# 删除
def
delete(
self
, sql, param
=
none):
try
:
cursor, conn, count
=
self
.execute(sql, param)
self
.close(cursor, conn)
return
count
except
exception as e:
print
(e)
conn.rollback()
self
.close(cursor, conn)
return
count
# 更新
def
update(
self
, sql, param
=
none):
try
:
cursor, conn, count
=
self
.execute(sql, param)
conn.commit()
self
.close(cursor, conn)
return
count
except
exception as e:
print
(e)
conn.rollback()
self
.close(cursor, conn)
return
count
if
__name__
=
=
'__main__'
:
db
=
mysqlhelper()
# # 查询单条
# sql1 = 'select * from userinfo where name=%s'
# args = 'python'
# ret = db.selectone(sql=sql1, param=args)
# print(ret) # (none, b'python', b'123456', b'0')
# 增加单条
# sql2 = 'insert into userinfo (name,password) values (%s,%s)'
# ret = db.insertone(sql2, ('old2','22222'))
# print(ret)
# 增加多条
# sql3 = 'insert into userinfo (name,password) values (%s,%s)'
# li = li = [
# ('分省', '123'),
# ('到达','456')
# ]
# ret = db.insertmany(sql3,li)
# print(ret)
# 删除
# sql4 = 'delete from userinfo where name=%s'
# args = 'xxxx'
# ret = db.delete(sql4, args)
# print(ret)
# 更新
# sql5 = r'update userinfo set password=%s where name like %s'
# args = ('993333993', '%old%')
# ret = db.update(sql5, args)
# print(ret)
|
原理 。
python编程中可以使用mysqldb进行数据库的连接及诸如查询/插入/更新等操作,但是每次连接mysql数据库请求时,都是独立的去请求访问,相当浪费资源, 。
而且访问数量达到一定数量时,对mysql的性能会产生较大的影响.
因此,实际使用中,通常会使用数据库的连接池技术,来访问数据库达到资源复用的目的.
1
|
pip3 install dbutils
|
dbutils是一套python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。dbutils来自webware for python.
dbutils提供两种外部接口:
下载地址:dbutils 下载解压后,使用python setup.py install 命令进行安装 。
下面利用mysqldb和dbutils建立自己的mysql数据库连接池工具包 。
在工程目录下新建package命名为:dbconnecttion,并新建module命名为mysqlconn,下面是mysqlconn.py,该模块创建mysql的连接池对象,并创建了如查询/插入等通用的操作方法。该部分代码实现如下:
还有很多其他参数可以配置:
dbapi :数据库接口 mincached :启动时开启的空连接数量 maxcached :连接池最大可用连接数量 maxshared :连接池最大可共享连接数量 maxconnections :最大允许连接数量 blocking :达到最大数量时是否阻塞 maxusage :单个连接最大复用次数 。
根据自己的需要合理配置上述的资源参数,以满足自己的实际需要.
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import
pymysql, os, configparser
from
pymysql.cursors
import
dictcursor
from
dbutils.pooleddb
import
pooleddb
class
config(
object
):
"""
# config().get_content("user_information")
配置文件里面的参数
[notdbmysql]
host = 192.168.1.101
port = 3306
user = root
password = python123
"""
def
__init__(
self
, config_filename
=
"myprojectconfig.cnf"
):
file_path
=
os.path.join(os.path.dirname(__file__), config_filename)
self
.cf
=
configparser.configparser()
self
.cf.read(file_path)
def
get_sections(
self
):
return
self
.cf.sections()
def
get_options(
self
, section):
return
self
.cf.options(section)
def
get_content(
self
, section):
result
=
{}
for
option
in
self
.get_options(section):
value
=
self
.cf.get(section, option)
result[option]
=
int
(value)
if
value.isdigit()
else
value
return
result
class
basepymysqlpool(
object
):
def
__init__(
self
, host, port, user, password, db_name
=
none):
self
.db_host
=
host
self
.db_port
=
int
(port)
self
.user
=
user
self
.password
=
str
(password)
self
.db
=
db_name
self
.conn
=
none
self
.cursor
=
none
class
mypymysqlpool(basepymysqlpool):
"""
mysql数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = mysql.getconn()
释放连接对象;conn.close()或del conn
"""
# 连接池对象
__pool
=
none
def
__init__(
self
, conf_name
=
none):
self
.conf
=
config().get_content(conf_name)
super
(mypymysqlpool,
self
).__init__(
*
*
self
.conf)
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self
._conn
=
self
.__getconn()
self
._cursor
=
self
._conn.cursor()
def
__getconn(
self
):
"""
@summary: 静态方法,从连接池中取出连接
@return mysqldb.connection
"""
if
mypymysqlpool.__pool
is
none:
__pool
=
pooleddb(creator
=
pymysql,
mincached
=
1
,
maxcached
=
20
,
host
=
self
.db_host,
port
=
self
.db_port,
user
=
self
.user,
passwd
=
self
.password,
db
=
self
.db,
use_unicode
=
false,
charset
=
"utf8"
,
cursorclass
=
dictcursor)
return
__pool.connection()
def
getall(
self
, sql, param
=
none):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if
param
is
none:
count
=
self
._cursor.execute(sql)
else
:
count
=
self
._cursor.execute(sql, param)
if
count >
0
:
result
=
self
._cursor.fetchall()
else
:
result
=
false
return
result
def
getone(
self
, sql, param
=
none):
"""
@summary: 执行查询,并取出第一条
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if
param
is
none:
count
=
self
._cursor.execute(sql)
else
:
count
=
self
._cursor.execute(sql, param)
if
count >
0
:
result
=
self
._cursor.fetchone()
else
:
result
=
false
return
result
def
getmany(
self
, sql, num, param
=
none):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if
param
is
none:
count
=
self
._cursor.execute(sql)
else
:
count
=
self
._cursor.execute(sql, param)
if
count >
0
:
result
=
self
._cursor.fetchmany(num)
else
:
result
=
false
return
result
def
insertmany(
self
, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的sql格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count
=
self
._cursor.executemany(sql, values)
return
count
def
__query(
self
, sql, param
=
none):
if
param
is
none:
count
=
self
._cursor.execute(sql)
else
:
count
=
self
._cursor.execute(sql, param)
return
count
def
update(
self
, sql, param
=
none):
"""
@summary: 更新数据表记录
@param sql: sql格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return
self
.__query(sql, param)
def
insert(
self
, sql, param
=
none):
"""
@summary: 更新数据表记录
@param sql: sql格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return
self
.__query(sql, param)
def
delete(
self
, sql, param
=
none):
"""
@summary: 删除数据表记录
@param sql: sql格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return
self
.__query(sql, param)
def
begin(
self
):
"""
@summary: 开启事务
"""
self
._conn.autocommit(
0
)
def
end(
self
, option
=
'commit'
):
"""
@summary: 结束事务
"""
if
option
=
=
'commit'
:
self
._conn.commit()
else
:
self
._conn.rollback()
def
dispose(
self
, isend
=
1
):
"""
@summary: 释放连接池资源
"""
if
isend
=
=
1
:
self
.end(
'commit'
)
else
:
self
.end(
'rollback'
)
self
._cursor.close()
self
._conn.close()
if
__name__
=
=
'__main__'
:
mysql
=
mypymysqlpool(
"notdbmysql"
)
sqlall
=
"select * from mytest.aa;"
result
=
mysql.getall(sqlall)
print
(result)
sqlall
=
"select * from mytest.aa;"
result
=
mysql.getmany(sqlall,
2
)
print
(result)
result
=
mysql.getone(sqlall)
print
(result)
# mysql.insert("insert into mytest.aa set a=%s", (1))
# 释放资源
mysql.dispose()
|
参考博客:https://www.cnblogs.com/renfanzi/p/7656142.html 。
到此这篇关于python3 实现mysql数据库连接池的示例代码的文章就介绍到这了,更多相关python3 mysql连接池内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://www.cnblogs.com/jiangxiaobo/p/12786205.html 。
最后此篇关于python3 实现mysql数据库连接池的示例代码的文章就讲到这里了,如果你想了解更多关于python3 实现mysql数据库连接池的示例代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
背景: 我最近一直在使用 JPA,我为相当大的关系数据库项目生成持久层的轻松程度给我留下了深刻的印象。 我们公司使用大量非 SQL 数据库,特别是面向列的数据库。我对可能对这些数据库使用 JPA 有一
我已经在我的 maven pom 中添加了这些构建配置,因为我希望将 Apache Solr 依赖项与 Jar 捆绑在一起。否则我得到了 SolarServerException: ClassNotF
interface ITurtle { void Fight(); void EatPizza(); } interface ILeonardo : ITurtle {
我希望可用于 Java 的对象/关系映射 (ORM) 工具之一能够满足这些要求: 使用 JPA 或 native SQL 查询获取大量行并将其作为实体对象返回。 允许在行(实体)中进行迭代,并在对当前
好像没有,因为我有实现From for 的代码, 我可以转换 A到 B与 .into() , 但同样的事情不适用于 Vec .into()一个Vec . 要么我搞砸了阻止实现派生的事情,要么这不应该发
在 C# 中,如果 A 实现 IX 并且 B 继承自 A ,是否必然遵循 B 实现 IX?如果是,是因为 LSP 吗?之间有什么区别吗: 1. Interface IX; Class A : IX;
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在阅读标准haskell库的(^)的实现代码: (^) :: (Num a, Integral b) => a -> b -> a x0 ^ y0 | y0 a -> b ->a expo x0
我将把国际象棋游戏表示为 C++ 结构。我认为,最好的选择是树结构(因为在每个深度我们都有几个可能的移动)。 这是一个好的方法吗? struct TreeElement{ SomeMoveType
我正在为用户名数据库实现字符串匹配算法。我的方法采用现有的用户名数据库和用户想要的新用户名,然后检查用户名是否已被占用。如果采用该方法,则该方法应该返回带有数据库中未采用的数字的用户名。 例子: “贾
我正在尝试实现 Breadth-first search algorithm , 为了找到两个顶点之间的最短距离。我开发了一个 Queue 对象来保存和检索对象,并且我有一个二维数组来保存两个给定顶点
我目前正在 ika 中开发我的 Python 游戏,它使用 python 2.5 我决定为 AI 使用 A* 寻路。然而,我发现它对我的需要来说太慢了(3-4 个敌人可能会落后于游戏,但我想供应 4-
我正在寻找 Kademlia 的开源实现C/C++ 中的分布式哈希表。它必须是轻量级和跨平台的(win/linux/mac)。 它必须能够将信息发布到 DHT 并检索它。 最佳答案 OpenDHT是
我在一本书中读到这一行:-“当我们要求 C++ 实现运行程序时,它会通过调用此函数来实现。” 而且我想知道“C++ 实现”是什么意思或具体是什么。帮忙!? 最佳答案 “C++ 实现”是指编译器加上链接
我正在尝试使用分支定界的 C++ 实现这个背包问题。此网站上有一个 Java 版本:Implementing branch and bound for knapsack 我试图让我的 C++ 版本打印
在很多情况下,我需要在 C# 中访问合适的哈希算法,从重写 GetHashCode 到对数据执行快速比较/查找。 我发现 FNV 哈希是一种非常简单/好/快速的哈希算法。但是,我从未见过 C# 实现的
目录 LRU缓存替换策略 核心思想 不适用场景 算法基本实现 算法优化
1. 绪论 在前面文章中提到 空间直角坐标系相互转换 ,测绘坐标转换时,一般涉及到的情况是:两个直角坐标系的小角度转换。这个就是我们经常在测绘数据处理中,WGS-84坐标系、54北京坐标系
在软件开发过程中,有时候我们需要定时地检查数据库中的数据,并在发现新增数据时触发一个动作。为了实现这个需求,我们在 .Net 7 下进行一次简单的演示. PeriodicTimer .
二分查找 二分查找算法,说白了就是在有序的数组里面给予一个存在数组里面的值key,然后将其先和数组中间的比较,如果key大于中间值,进行下一次mid后面的比较,直到找到相等的,就可以得到它的位置。
我是一名优秀的程序员,十分优秀!