gpt4 book ai didi

python - 为什么我在 python 3.5 中使用 asyncio 时会忽略异常

转载 作者:太空宇宙 更新时间:2023-11-04 00:56:28 25 4
gpt4 key购买 nike

信息:

  • python 3.5
  • 使用 SublimeREPL 运行代码
  • 异步

异常回溯如下

Exception ignored in: <bound method Connection.__del__ of <aiomysql.connection.Connection object at 0x00000030F8080B38>>
Traceback (most recent call last):
File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 689, in __del__
File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 261, in close
File "C:\software\development\python3.5\lib\asyncio\selector_events.py", line 569, in close
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 447, in call_soon
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 456, in _call_soon
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

我实现了一个简单的ORM框架,带有一些处理SQL的功能。orm.py中的一些相关代码(忽略中文注释)如下。类 Model 中的 update 或 findAll 方法运行良好,确实正确地给出了结果,但每次运行 test_method 后,它都会给出异常。

@asyncio.coroutine
def create_pool(loop, **kw): # 引入关键字后不用显示import asyncio了
# 该函数用于创建连接池
global __pool # 全局变量用于保存连接池
__pool = yield from aiomysql.create_pool(
host=kw.get('host', 'localhost'), # 默认定义host名字为localhost
port=kw.get('port', 3306), # 默认定义mysql的默认端口是3306
user=kw['user'], # user是通过关键字参数传进来的
password=kw['password'], # 密码也是通过关键字参数传进来的
db=kw['db'], # 数据库的名字
charset=kw.get('charset', 'utf8'), # 默认数据库字符集是utf8
autocommit=kw.get('autocommit', True), # 默认自动提交事务
maxsize=kw.get('maxsize', 10), # 连接池最多同时处理10个请求
minsize=kw.get('minsize', 1), # 连接池最少1个请求
loop=loop # 传递消息循环对象loop用于异步执行
)
@asyncio.coroutine
def execute(sql, args, autocommit=True):
# execute方法只返回结果数,不返回结果集,用于insert,update这些SQL语句
log(sql)
with (yield from __pool) as conn:
if not autocommit:
yield from conn.begin()
try:
cur = yield from conn.cursor()
# 执行sql语句,同时替换占位符
# pdb.set_trace()
yield from cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount # 返回受影响的行数
yield from cur.close() # 关闭游标
if not autocommit:
yield from conn.commit()
except BaseException as e:
if not autocommit:
yield from conn.rollback()
raise e # raise不带参数,则把此处的错误往上抛;为了方便理解还是建议加e吧
return affected


class Model(dict, metaclass=ModelMetaclass):
# 继承dict是为了使用方便,例如对象实例user['id']即可轻松通过UserModel去数据库获取到id
# 元类自然是为了封装我们之前写的具体的SQL处理函数,从数据库获取数据

def __init__(self, **kw):
# 调用dict的父类__init__方法用于创建Model,super(类名,类对象)
super(Model, self).__init__(**kw)

def __getattr__(self, key):
# 调用不存在的属性时返回一些内容
try:
return self[key] # 如果存在则正常返回
except KeyError:
raise AttributeError(
r"'Model' object has no attribute '%s'" % key) # r表示不转义

def __setattr__(self, key, value):
# 设定Model里面的key-value对象,这里value允许为None
self[key] = value

def getValue(self, key):
# 获取某个具体的值,肯定存在的情况下使用该函数,否则会使用__getattr()__
# 获取实例的key,None是默认值,getattr方法使用可以参考http://kaimingwan.com/post/python/pythonzhong-de-nei-zhi-han-shu-getattr-yu-fan-she
return getattr(self, key, None)

def getValueOrDefault(self, key):
# 这个方法当value为None的时候能够返回默认值
value = getattr(self, key, None)
if value is None: # 不存在这样的值则直接返回
# self.__mapping__在metaclass中,用于保存不同实例属性在Model基类中的映射关系
field = self.__mappings__[key]
if field.default is not None: # 如果实例的域存在默认值,则使用默认值
# field.default是callable的话则直接调用
value = field.default() if callable(field.default) else field.default
logging.debug('using default value for %s:%s' %
(key, str(value)))
setattr(self, key, value)
return value


# --------------------------每个Model类的子类实例应该具备的执行SQL的方法比如save------
@classmethod # 类方法
@asyncio.coroutine
def findAll(cls, where=None, args=None, **kw):
sql = [cls.__select__] # 获取默认的select语句
if where: # 如果有where语句,则修改sql变量
# 这里不用协程,是因为不需要等待数据返回
sql.append('where') # sql里面加上where关键字
sql.append(where) # 这里的where实际上是colName='xxx'这样的条件表达式
if args is None: # 什么参数?
args = []

orderBy = kw.get('orderBy', None) # 从kw中查看是否有orderBy属性
if orderBy:
sql.append('order by')
sql.append(orderBy)

limit = kw.get('limit', None) # mysql中可以使用limit关键字
if limit is not None:
sql.append('limit')
if isinstance(limit, int): # 如果是int类型则增加占位符
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2: # limit可以取2个参数,表示一个范围
sql.append('?,?')
args.extend(limit)
else: # 其他情况自然是语法问题
raise ValueError('Invalid limit value: %s' % str(limit))
# 在原来默认SQL语句后面再添加语句,要加个空格

rs = yield from select(' '.join(sql), args)
return [cls(**r) for r in rs] # 返回结果,结果是list对象,里面的元素是dict类型的

@classmethod
@asyncio.coroutine
def findNumber(cls, selectField, where=None, args=None):
# 获取行数
# 这里的 _num_ 什么意思?别名? 我估计是mysql里面一个记录实时查询结果条数的变量
sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
# pdb.set_trace()
if where:
sql.append('where')
sql.append(where) # 这里不加空格?
rs = yield from select(' '.join(sql), args, 1) # size = 1
if len(rs) == 0: # 结果集为0的情况
return None
return rs[0]['_num_'] # 有结果则rs这个list中第一个词典元素_num_这个key的value值

@classmethod
@asyncio.coroutine
def find_by_key(cls, pk):
# 根据主键查找
# pk是dict对象
rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
if len(rs) == 0:
return None
return cls(**rs[0])

# 这个是实例方法
@asyncio.coroutine
def save(self):
# arg是保存所有Model实例属性和主键的list,使用getValueOrDefault方法的好处是保存默认值
# 将自己的fields保存进去
args = list(map(self.getValueOrDefault, self.__fields__))
args.append(self.getValueOrDefault(self.__primary_key__))
# pdb.set_trace()
rows = yield from execute(self.__insert__, args) # 使用默认插入函数
if rows != 1:
# 插入失败就是rows!=1
logging.warn(
'failed to insert record: affected rows: %s' % rows)

@asyncio.coroutine
def update(self):
# 这里使用getValue说明只能更新那些已经存在的值,因此不能使用getValueOrDefault方法
args = list(map(self.getValue, self.__fields__))
args.append(self.getValue(self.__primary_key__))
# pdb.set_trace()
rows = yield from execute(self.__update__, args) # args是属性的list
if rows != 1:
logging.warn(
'failed to update by primary key: affected rows: %s' % rows)

@asyncio.coroutine
def remove(self):
args = [self.getValue(self.__primary_key__)]
# pdb.set_trace()
rows = yield from execute(self.__delete__, args)
if rows != 1:
logging.warn(
'failed to remove by primary key: affected rows: %s' % rows)

测试文件如下。

from models import User
import asyncio
import sys
import orm
import pdb
import time

# import pdb

# 测试插入


@asyncio.coroutine
def test_save(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
u = User(name='hi', email='hi@example.com',
passwd='hi', image='about:blank')
# pdb.set_trace()
yield from u.save()

# 测试查询


@asyncio.coroutine
def test_findAll(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 这里给的关键字参数按照xxx='xxx'的形式给出,会自动分装成dict
rs = yield from User.findAll(email='test@example.com') # rs是一个元素为dict的list
# pdb.set_trace()
for i in range(len(rs)):
print(rs[i])

# 查询条数?


@asyncio.coroutine
def test_findNumber(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
count = yield from User.findNumber('email')
print(count)

# 根据主键查找,这里试ID


@asyncio.coroutine
def test_find_by_key(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# rs是一个dict
# ID请自己通过数据库查询
rs = yield from User.find_by_key('0014531826762080b29033a78624bc68c867550778f64d6000')
print(rs)

# 根据主键删除


@asyncio.coroutine
def test_remove(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 用id初始化一个实例对象
u = User(id='0014531826762080b29033a78624bc68c867550778f64d6000')
yield from u.remove()


# 根据主键更新
@asyncio.coroutine
def test_update(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 必须按照列的顺序来初始化:'update `users` set `created_at`=?, `passwd`=?, `image`=?,
# `admin`=?, `name`=?, `email`=? where `id`=?' 注意这里要使用time()方法,否则会直接返回个时间戳对象,而不是float值
u = User(id='00145318300622886f186530ee74afabecedb42f9cd590a000', created_at=time.time(), passwd='test',
image='about:blank', admin=True, name='test', email='hello1@example.com') # id必须和数据库一直,其他属性可以设置成新的值,属性要全
# pdb.set_trace()
yield from u.update()


loop = asyncio.get_event_loop() # 获取消息循环对象
loop.run_until_complete(test_update(loop)) # 执行协程
loop.close()

最佳答案

在关闭事件循环之前,需要关闭连接池,参见docs

pool.close()
yield from pool.wait_closed()

在你的情况下:

loop = asyncio.get_event_loop()
loop.run_until_complete(test_update(loop))
__pool.close()
loop.run_until_complete(__pool.wait_closed())
loop.close()

关于python - 为什么我在 python 3.5 中使用 asyncio 时会忽略异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34873111/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com