SQLAlchemy+PyMysql:如何从过时的连接错误中恢复 (pymysql.err.InternalError)

SQLAlchemy+PyMysql: how to recover from stale connection error (pymysql.err.InternalError)

我显然似乎遗漏了一些东西:我希望 SQLAlchemy 通过在 'checkout' 重新创建它们来从陈旧的连接中恢复。

我的应用正在使用 SQLAlchemy ORM(严格来说是 ORM,根本不使用表达式语言)连接并与 MYSQL 数据库通信。当一段时间没有 activity 时,池中的连接似乎变得陈旧并生成

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected') 

我试过设置 pre-ping 在 create_engine 中也有 pool_recycle。

我创建了一个示例来对此进行测试:请注意 pool_recycle 积极尝试更快地产生错误情况(在我的应用程序中,它是 3600)

# my test program in .py file
# - fully working code - BUT replace name/pwd/Dbserver in create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, event, exc
from sqlalchemy import MetaData, Column, Integer, DateTime
from datetime import datetime
from time import sleep
import pymysql

engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
                       pool_pre_ping=True, pool_recycle=1)

Session = sessionmaker(bind=engine)

if __name__ == '__main__':
    engine.execute('DROP DATABASE IF EXISTS satest')
    # create and use the DB
    engine.execute('CREATE DATABASE IF NOT EXISTS satest')
    engine.execute('USE satest')

    # Prepare for creating tables
    metadata = MetaData(engine)
    # session = scoped_session(sessionmaker(bind=engine))
    base = declarative_base()


class TblFoo(base):
     __tablename__ = 'Foo'
    id = Column(Integer, primary_key=True, nullable=False)
    counter = Column(Integer)
    dateTime = Column(DateTime)


def createFooRecords():
    count = 1
    while count < 1000000:
        realCount = count
        try:
            global Session
            session = Session()
            rec = TblFoo(counter=count,
                         dateTime=datetime.today().isoformat())
            session.add(rec)
            session.commit()
            count += 1
        finally:
            session.close()

        if realCount != count:
            sleep(realCount*5)


@event.listens_for(engine, "handle_error")
def receive_handle_error(exception_context):
    print('Handle Error: ', exception_context)
    if isinstance(exception_context.original_exception, 
                  pymysql.err.InternalError):
        if str(exception_context.original_exception) == '1046':
            print('Internal Error: ')
        else:
            print('Some other error: ', 
                   exception_context.original_exception)


@event.listens_for(engine, 'invalidate')
def receive_invalidate(dbapi_connection, connection_record, exception):
    print('DBAPI connection: ', dbapi_connection)
    print('Conn Record: ', connection_record)
    print('Exception: ', exception)


if __name__ == '__main__':
    base.metadata.create_all(engine)
    createFooRecords()

当然,engine.pool 中的连接在 1 秒后超时,第二次调用 session.add(rec) 失败。

"handle_error" 确实被调用了。

Handle Error:  <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1093311d0>
Some other error:  (1046, 'No database selected')
Traceback (most recent call last):
.... stacktrace omitted for brevity, finally .... 
ssqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:07:45.184456'}] (Background on this error at: http://sqlalche.me/e/2j85)

"invalidate" 永远不会被调用。

那么为什么 SQLAlchemy 不重新创建连接,因为它很清楚它会在 1 秒内过期(pool_recycle 值)。 (不清楚它是否正在执行预 ping - 我如何验证? - 但如果它发生了,它仍然没有重新创建连接。)

诚然,错误来自 PyMysql,但设置在 SQLAlchemy 中。

当然,我想要这个可恢复的,所以我在 try-catch 中添加了一个例外:但也许重新创建引擎不是正确的方法。

def createFooRecords():
    count = 1
    while count < 1000000:
        realCount = count
        try:
            global Session
            session = Session()
            rec = TblFoo(counter=count, 
                         dateTime=datetime.today().isoformat())
            session.add(rec)
            session.commit()
            count += 1
        except exc.InternalError as e:
            print('{} Exception: {}'.format(count, e))
            global engine
            engine.dispose()
            engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
                                   pool_pre_ping=True, pool_recycle=1)
            Session = sessionmaker(bind=engine)
            continue
        except Exception as e2:
            print('Error', e2)
            raise
        finally:
            session.close()

        if realCount != count:
            sleep(realCount*5)



Handle Error:  <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1076b6fd0>
Some other error:  (1046, 'No database selected')
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.498157'}] (Background on this error at: http://sqlalche.me/e/2j85)
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.503714'}] (Background on this error at: http://sqlalche.me/e/2j85)

...以及当 'count' 永远不会递增时人们所期望的无数这样的流。

那么网络网络,如何从失效连接中恢复(最好在结帐时)。我有一个相当复杂的应用程序,它只使用 ORM,并且有很多数据库(阅读:ORM Table)读写分散在整个过程中,所以即使是其他建议,如 lambda 或 运行 查询的中心函数,等不是很实用(或者应该是?)欢迎提出建议。

您遇到的问题是因为您没有在连接中定义数据库名称 url。记录的模式是 dialect+driver://username:password@host:port/database,您提供 'mysql+pymysql://name:pwd@127.0.0.1'。所以你提供了方言、驱动程序、用户名、密码和主机,但没有数据库。

在这段代码中:

if __name__ == '__main__':
    engine.execute('DROP DATABASE IF EXISTS satest')
    # create and use the DB
    engine.execute('CREATE DATABASE IF NOT EXISTS satest')
    engine.execute('USE satest')

    # Prepare for creating tables
    metadata = MetaData(engine)
    # session = scoped_session(sessionmaker(bind=engine))
    base = declarative_base()

... 每次启动应用程序时首先删除整个数据库,重新创建它,然后手动发出 USE satest 查询。问题是,USE satest 命令仅适用于从池中签出的单个连接。只要池 returns 你有相同的连接,它就会一直存在,但是一旦创建了新连接,它就没有向它发出 USE satest 命令。

我的建议是创建一次数据库并将其留在那里。然后,将上面的代码更改为:

...
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest')
...

if __name__ == '__main__':
    # Prepare for creating tables
    metadata = MetaData(engine)
    # session = scoped_session(sessionmaker(bind=engine))
    base = declarative_base()
    base.metadata.drop_all()
    base.metadata.create_all()

如果你真的想继续做你正在做的事情,pymysql 连接接受一个 init_command 参数,根据他们的文档,该参数是:

Initial SQL statement to run when connection is established.

因此您可以将 "USE satest" 命令传递给该连接参数。您可以阅读如何做到这一点 here,但一个示例可能是:

engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest', connect_args={"init_command": "USE satest"})