为什么我在 python 3.5 中使用 asyncio 时会忽略异常

Why I got ignored exception when I use asyncio in python 3.5

信息:

异常回溯如下

Exception ignored in: <bound method Connection.__del__ of <aiomysql.connection.Connection object at 0x00000030F8080B38>>
Traceback (most recent call last):
  File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 689, in __del__
  File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 261, in close
  File "C:\software\development\python3.5\lib\asyncio\selector_events.py", line 569, in close
  File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 447, in call_soon
  File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 456, in _call_soon
  File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

我实现了一个简单的ORM框架,其中包含一些函数来处理orm.py中的SQL.Some相关代码(忽略中文注释)如下。 class 模型中的 update 或 findAll 方法运行良好并且确实正确地给出了结果但是每次我 运行 宁 test_method 之后,它都会给出异常。

@asyncio.coroutine
def create_pool(loop, **kw):        # 引入关键字后不用显示import asyncio了
    # 该函数用于创建连接池
    global __pool  # 全局变量用于保存连接池
    __pool = yield from aiomysql.create_pool(
        host=kw.get('host', 'localhost'),  # 默认定义host名字为localhost
        port=kw.get('port', 3306),      # 默认定义mysql的默认端口是3306
        user=kw['user'],                # user是通过关键字参数传进来的
        password=kw['password'],        # 密码也是通过关键字参数传进来的
        db=kw['db'],                    # 数据库的名字
        charset=kw.get('charset', 'utf8'),  # 默认数据库字符集是utf8
        autocommit=kw.get('autocommit', True),  # 默认自动提交事务
        maxsize=kw.get('maxsize', 10),      # 连接池最多同时处理10个请求
        minsize=kw.get('minsize', 1),       # 连接池最少1个请求
        loop=loop       # 传递消息循环对象loop用于异步执行
    )
@asyncio.coroutine
def execute(sql, args, autocommit=True):
    # execute方法只返回结果数,不返回结果集,用于insert,update这些SQL语句
    log(sql)
    with (yield from __pool) as conn:
        if not autocommit:
            yield from conn.begin()
        try:
            cur = yield from conn.cursor()
            # 执行sql语句,同时替换占位符
            # pdb.set_trace()
            yield from cur.execute(sql.replace('?', '%s'), args)
            affected = cur.rowcount     # 返回受影响的行数
            yield from cur.close()       # 关闭游标
            if not autocommit:
                yield from conn.commit()
        except BaseException as e:
            if not autocommit:
                yield from conn.rollback()
            raise e  # raise不带参数,则把此处的错误往上抛;为了方便理解还是建议加e吧
        return affected


class Model(dict, metaclass=ModelMetaclass):
    # 继承dict是为了使用方便,例如对象实例user['id']即可轻松通过UserModel去数据库获取到id
    # 元类自然是为了封装我们之前写的具体的SQL处理函数,从数据库获取数据

    def __init__(self, **kw):
        # 调用dict的父类__init__方法用于创建Model,super(类名,类对象)
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):
        # 调用不存在的属性时返回一些内容
        try:
            return self[key]  # 如果存在则正常返回
        except KeyError:
            raise AttributeError(
                r"'Model' object has no attribute '%s'" % key)      # r表示不转义

    def __setattr__(self, key, value):
        # 设定Model里面的key-value对象,这里value允许为None
        self[key] = value

    def getValue(self, key):
        # 获取某个具体的值,肯定存在的情况下使用该函数,否则会使用__getattr()__
        # 获取实例的key,None是默认值,getattr方法使用可以参考http://kaimingwan.com/post/python/pythonzhong-de-nei-zhi-han-shu-getattr-yu-fan-she
        return getattr(self, key, None)

    def getValueOrDefault(self, key):
        # 这个方法当value为None的时候能够返回默认值
        value = getattr(self, key, None)
        if value is None:       # 不存在这样的值则直接返回
            # self.__mapping__在metaclass中,用于保存不同实例属性在Model基类中的映射关系
            field = self.__mappings__[key]
            if field.default is not None:  # 如果实例的域存在默认值,则使用默认值
                # field.default是callable的话则直接调用
                value = field.default() if callable(field.default) else field.default
                logging.debug('using default value for %s:%s' %
                              (key, str(value)))
                setattr(self, key, value)
        return value


# --------------------------每个Model类的子类实例应该具备的执行SQL的方法比如save------
    @classmethod    # 类方法
    @asyncio.coroutine
    def findAll(cls, where=None, args=None, **kw):
        sql = [cls.__select__]  # 获取默认的select语句
        if where:   # 如果有where语句,则修改sql变量
            # 这里不用协程,是因为不需要等待数据返回
            sql.append('where')  # sql里面加上where关键字
            sql.append(where)   # 这里的where实际上是colName='xxx'这样的条件表达式
        if args is None:    # 什么参数?
            args = []

        orderBy = kw.get('orderBy', None)    # 从kw中查看是否有orderBy属性
        if orderBy:
            sql.append('order by')
            sql.append(orderBy)

        limit = kw.get('limit', None)    # mysql中可以使用limit关键字
        if limit is not None:
            sql.append('limit')
            if isinstance(limit, int):   # 如果是int类型则增加占位符
                sql.append('?')
                args.append(limit)
            elif isinstance(limit, tuple) and len(limit) == 2:   # limit可以取2个参数,表示一个范围
                sql.append('?,?')
                args.extend(limit)
            else:       # 其他情况自然是语法问题
                raise ValueError('Invalid limit value: %s' % str(limit))
            # 在原来默认SQL语句后面再添加语句,要加个空格

        rs = yield from select(' '.join(sql), args)
        return [cls(**r) for r in rs]   # 返回结果,结果是list对象,里面的元素是dict类型的

    @classmethod
    @asyncio.coroutine
    def findNumber(cls, selectField, where=None, args=None):
        # 获取行数
        # 这里的 _num_ 什么意思?别名? 我估计是mysql里面一个记录实时查询结果条数的变量
        sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
        # pdb.set_trace()
        if where:
            sql.append('where')
            sql.append(where)   # 这里不加空格?
        rs = yield from select(' '.join(sql), args, 1)  # size = 1
        if len(rs) == 0:  # 结果集为0的情况
                return None
        return rs[0]['_num_']   # 有结果则rs这个list中第一个词典元素_num_这个key的value值

    @classmethod
    @asyncio.coroutine
    def find_by_key(cls, pk):
        # 根据主键查找
        # pk是dict对象
        rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
        if len(rs) == 0:
            return None
        return cls(**rs[0])

    # 这个是实例方法
    @asyncio.coroutine
    def save(self):
        # arg是保存所有Model实例属性和主键的list,使用getValueOrDefault方法的好处是保存默认值
        # 将自己的fields保存进去
        args = list(map(self.getValueOrDefault, self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        # pdb.set_trace()
        rows = yield from execute(self.__insert__, args)  # 使用默认插入函数
        if rows != 1:
            # 插入失败就是rows!=1
            logging.warn(
                'failed to insert record: affected rows: %s' % rows)

    @asyncio.coroutine
    def update(self):
        # 这里使用getValue说明只能更新那些已经存在的值,因此不能使用getValueOrDefault方法
        args = list(map(self.getValue, self.__fields__))
        args.append(self.getValue(self.__primary_key__))
        # pdb.set_trace()
        rows = yield from execute(self.__update__, args)    # args是属性的list
        if rows != 1:
            logging.warn(
                'failed to update by primary key: affected rows: %s' % rows)

    @asyncio.coroutine
    def remove(self):
        args = [self.getValue(self.__primary_key__)]
        # pdb.set_trace()
        rows = yield from execute(self.__delete__, args)
        if rows != 1:
            logging.warn(
                'failed to remove by primary key: affected rows: %s' % rows)

测试文件如下

from models import User
import asyncio
import sys
import orm
import pdb
import time

# import pdb

# 测试插入


@asyncio.coroutine
def test_save(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    u = User(name='hi', email='hi@example.com',
             passwd='hi', image='about:blank')
    # pdb.set_trace()
    yield from u.save()

# 测试查询


@asyncio.coroutine
def test_findAll(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 这里给的关键字参数按照xxx='xxx'的形式给出,会自动分装成dict
    rs = yield from User.findAll(email='test@example.com')      # rs是一个元素为dict的list
    # pdb.set_trace()
    for i in range(len(rs)):
        print(rs[i])

# 查询条数?


@asyncio.coroutine
def test_findNumber(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    count = yield from User.findNumber('email')
    print(count)

# 根据主键查找,这里试ID


@asyncio.coroutine
def test_find_by_key(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # rs是一个dict
    # ID请自己通过数据库查询
    rs = yield from User.find_by_key('0014531826762080b29033a78624bc68c867550778f64d6000')
    print(rs)

# 根据主键删除


@asyncio.coroutine
def test_remove(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 用id初始化一个实例对象
    u = User(id='0014531826762080b29033a78624bc68c867550778f64d6000')
    yield from u.remove()


# 根据主键更新
@asyncio.coroutine
def test_update(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 必须按照列的顺序来初始化:'update `users` set `created_at`=?, `passwd`=?, `image`=?,
    # `admin`=?, `name`=?, `email`=? where `id`=?' 注意这里要使用time()方法,否则会直接返回个时间戳对象,而不是float值
    u = User(id='00145318300622886f186530ee74afabecedb42f9cd590a000', created_at=time.time(), passwd='test',
             image='about:blank', admin=True, name='test', email='hello1@example.com')  # id必须和数据库一直,其他属性可以设置成新的值,属性要全
    # pdb.set_trace()
    yield from u.update()


loop = asyncio.get_event_loop()  # 获取消息循环对象
loop.run_until_complete(test_update(loop))  # 执行协程
loop.close()

我尝试将 "run_until_complete" 方法放入协程中(请参阅此代码末尾的 execute_test 方法),它似乎有效。但我仍然不知道为什么。

from models import User
import asyncio
import sys
import orm
import pdb
import time

# import pdb

# 测试插入


@asyncio.coroutine
def test_save(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    u = User(name='hi', email='hi@example.com',
             passwd='hi', image='about:blank')
    # pdb.set_trace()
    yield from u.save()

# 测试查询


@asyncio.coroutine
def test_findAll(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 这里给的关键字参数按照xxx='xxx'的形式给出,会自动分装成dict
    rs = yield from User.findAll(email='test@example.com')      # rs是一个元素为dict的list
    # pdb.set_trace()
    for i in range(len(rs)):
        print(rs[i])

# 查询条数?


@asyncio.coroutine
def test_findNumber(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    count = yield from User.findNumber('email')
    print(count)

# 根据主键查找,这里试ID


@asyncio.coroutine
def test_find_by_key(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # rs是一个dict
    # ID请自己通过数据库查询
    rs = yield from User.find_by_key('0014531826762080b29033a78624bc68c867550778f64d6000')
    print(rs)

# 根据主键删除


@asyncio.coroutine
def test_remove(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 用id初始化一个实例对象
    u = User(id='0014531826762080b29033a78624bc68c867550778f64d6000')
    yield from u.remove()


# 根据主键更新
@asyncio.coroutine
def test_update(loop):
    yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
    # 必须按照列的顺序来初始化:'update `users` set `created_at`=?, `passwd`=?, `image`=?,
    # `admin`=?, `name`=?, `email`=? where `id`=?' 注意这里要使用time()方法,否则会直接返回个时间戳对象,而不是float值
    u = User(id='00145318300622886f186530ee74afabecedb42f9cd590a000', created_at=time.time(), passwd='test',
             image='about:blank', admin=True, name='test', email='hello1@example.com')  # id必须和数据库一直,其他属性可以设置成新的值,属性要全
    # pdb.set_trace()
    yield from u.update()


@asyncio.coroutine
def execute_test(loop):
    yield from loop.run_until_complete(test_update(loop))  # 执行协程
    yield from loop.close()


loop = asyncio.get_event_loop()  # 获取消息循环对象
execute_test(loop)

关闭事件循环之前,需要关闭连接池,见docs

pool.close()
yield from pool.wait_closed()

你的情况:

loop = asyncio.get_event_loop()
loop.run_until_complete(test_update(loop))
__pool.close()
loop.run_until_complete(__pool.wait_closed())
loop.close()