Python 脚本数据库连接作为池不工作,但简单连接工作

Python script DB connection as Pool not working, but simple connection is working

我正在 python 3 中编写一个脚本,它正在侦听隧道并在 MySQL 中保存和更新数据,具体取决于收到的消息。

我遇到了奇怪的行为,我使用 pymysql 模块与 MySQL 建立了一个简单的连接,一切正常,但过了一段时间这个简单的连接关闭了。

所以我决定实现 MySQL 的池连接,问题就出现了。没有发生任何错误,但问题如下:

我的游标= yield self._pool.execute(query, list(filters.values()))

游标结果= tornado_mysql.pools.Pool 对象在 0x0000019DE5D71F98

像这样堆叠,什么都不做

如果我从光标中删除 yield 通过该行,下一行将抛出错误

响应= 产量c.fetchall()

AttributeError: 'Future' 对象没有属性 'fetchall'

如何修复 MySQL 池连接以使其正常工作?

我尝试了什么:

  1. 我使用了几个模块来连接池,所有模块都在同一个问题中

  2. 恢复了与 pymysql 的简单连接并再次工作

下面是我的代码:

python脚本文件

import pika
from model import SyncModel

_model = SyncModel(conf, _server_id)

@coroutine
def main():
    credentials = pika.PlainCredentials('user', 'password')

    try:
        cp = pika.ConnectionParameters(
            host='127.0.0.1',
            port=5671,
            credentials=credentials,
            ssl=False,
        )

        connection = pika.BlockingConnection(cp)
        channel = connection.channel()

        @coroutine
        def callback(ch, method, properties, body):
            if 'messageType' in properties.headers:
                message_type = properties.headers['messageType']
                if message_type in allowed_message_types:
                    result = ptoto_file._reflection.ParseMessage(descriptors[message_type], body)
                    if result:
                        result = protobuf_to_dict(result)
                        if message_type == 'MyMessage':
                            yield _model.message_event(data=result)


                else:
                    print('Message type not in allowed list = ' + str(message_type))
                    print('continue listening...')

        channel.basic_consume(callback, queue='queue', no_ack=True)

        print(' [*] Waiting for messages. To exit press CTRL+C')
        channel.start_consuming()
    except Exception as e:
        print('Could not connect to host 127.0.0.1 on port 5671')
        print(str(e))


if __name__ == '__main__':
    main()

同步模型

from tornado_mysql import pools  
from tornado.gen import coroutine, Return
from tornado_mysql.cursors import DictCursor 

class SyncModel(object):
    def __init__(self, conf, server_id):
        self.conf = conf
        servers = [i for i in conf.mysql.servers]

        for s in servers:
            if s['server_id'] == server_id:
                // s hold all data as, host, user, port, autocommit, charset, db, password
                s['cursorclass'] = DictCursor                
                self._pool = pools.Pool(s, max_idle_connections=1, max_recycle_sec=3)

    @coroutine
    def message_event(self, data):
        table_name = 'table_name'
        query = ''
        data = data['message']

        filters = {
            'id': data['id']
        }

        // here the connection fails as describe above
        response = yield self.query_select(table_name, self._pool, filters=filters)


    @coroutine
    def query_select(self, table_name, _pool, filters=None):
        if filters is None:
            filters = {}

        combined_filters = ['`%s` = %%s' % i for i in filters.keys()]
        where = 'WHERE ' + ' AND '.join(combined_filters) if combined_filters else ''
        query = """SELECT * FROM `%s` %s""" % (table_name, where)
        c = self._pool.execute(query, list(filters.values()))

        response = yield c.fetchall()

        raise Return({response})

所有代码仅通过与数据库的简单连接工作,在我开始使用池示例后不再工作。在此问题上将不胜感激。

这是一个独立的脚本。

池连接不工作,所以切换回 pymysql 仔细检查连接

我想 post 我的答案有效,只有这个解决方案对我有效

  1. 在连接到mysql之前检查连接是否打开,如果没有重新连接

    if not self.mysql.open:
        self.mysql.ping(reconnect=True)