SQLAlchemy+PyMysql:如何从过时的连接错误中恢复 (pymysql.err.InternalError)
SQLAlchemy+PyMysql: how to recover from stale connection error (pymysql.err.InternalError)
我显然似乎遗漏了一些东西:我希望 SQLAlchemy 通过在 'checkout' 重新创建它们来从陈旧的连接中恢复。
我的应用正在使用 SQLAlchemy ORM(严格来说是 ORM,根本不使用表达式语言)连接并与 MYSQL 数据库通信。当一段时间没有 activity 时,池中的连接似乎变得陈旧并生成
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected')
我试过设置 pre-ping 在 create_engine 中也有 pool_recycle。
我创建了一个示例来对此进行测试:请注意 pool_recycle 积极尝试更快地产生错误情况(在我的应用程序中,它是 3600)
# my test program in .py file
# - fully working code - BUT replace name/pwd/Dbserver in create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, event, exc
from sqlalchemy import MetaData, Column, Integer, DateTime
from datetime import datetime
from time import sleep
import pymysql
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
pool_pre_ping=True, pool_recycle=1)
Session = sessionmaker(bind=engine)
if __name__ == '__main__':
engine.execute('DROP DATABASE IF EXISTS satest')
# create and use the DB
engine.execute('CREATE DATABASE IF NOT EXISTS satest')
engine.execute('USE satest')
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
class TblFoo(base):
__tablename__ = 'Foo'
id = Column(Integer, primary_key=True, nullable=False)
counter = Column(Integer)
dateTime = Column(DateTime)
def createFooRecords():
count = 1
while count < 1000000:
realCount = count
try:
global Session
session = Session()
rec = TblFoo(counter=count,
dateTime=datetime.today().isoformat())
session.add(rec)
session.commit()
count += 1
finally:
session.close()
if realCount != count:
sleep(realCount*5)
@event.listens_for(engine, "handle_error")
def receive_handle_error(exception_context):
print('Handle Error: ', exception_context)
if isinstance(exception_context.original_exception,
pymysql.err.InternalError):
if str(exception_context.original_exception) == '1046':
print('Internal Error: ')
else:
print('Some other error: ',
exception_context.original_exception)
@event.listens_for(engine, 'invalidate')
def receive_invalidate(dbapi_connection, connection_record, exception):
print('DBAPI connection: ', dbapi_connection)
print('Conn Record: ', connection_record)
print('Exception: ', exception)
if __name__ == '__main__':
base.metadata.create_all(engine)
createFooRecords()
当然,engine.pool 中的连接在 1 秒后超时,第二次调用 session.add(rec) 失败。
"handle_error" 确实被调用了。
Handle Error: <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1093311d0>
Some other error: (1046, 'No database selected')
Traceback (most recent call last):
.... stacktrace omitted for brevity, finally ....
ssqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:07:45.184456'}] (Background on this error at: http://sqlalche.me/e/2j85)
"invalidate" 永远不会被调用。
那么为什么 SQLAlchemy 不重新创建连接,因为它很清楚它会在 1 秒内过期(pool_recycle 值)。 (不清楚它是否正在执行预 ping - 我如何验证? - 但如果它发生了,它仍然没有重新创建连接。)
诚然,错误来自 PyMysql,但设置在 SQLAlchemy 中。
当然,我想要这个可恢复的,所以我在 try-catch 中添加了一个例外:但也许重新创建引擎不是正确的方法。
def createFooRecords():
count = 1
while count < 1000000:
realCount = count
try:
global Session
session = Session()
rec = TblFoo(counter=count,
dateTime=datetime.today().isoformat())
session.add(rec)
session.commit()
count += 1
except exc.InternalError as e:
print('{} Exception: {}'.format(count, e))
global engine
engine.dispose()
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
pool_pre_ping=True, pool_recycle=1)
Session = sessionmaker(bind=engine)
continue
except Exception as e2:
print('Error', e2)
raise
finally:
session.close()
if realCount != count:
sleep(realCount*5)
Handle Error: <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1076b6fd0>
Some other error: (1046, 'No database selected')
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.498157'}] (Background on this error at: http://sqlalche.me/e/2j85)
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.503714'}] (Background on this error at: http://sqlalche.me/e/2j85)
...以及当 'count' 永远不会递增时人们所期望的无数这样的流。
那么网络网络,如何从失效连接中恢复(最好在结帐时)。我有一个相当复杂的应用程序,它只使用 ORM,并且有很多数据库(阅读:ORM Table)读写分散在整个过程中,所以即使是其他建议,如 lambda 或 运行 查询的中心函数,等不是很实用(或者应该是?)欢迎提出建议。
您遇到的问题是因为您没有在连接中定义数据库名称 url。记录的模式是 dialect+driver://username:password@host:port/database
,您提供 'mysql+pymysql://name:pwd@127.0.0.1'
。所以你提供了方言、驱动程序、用户名、密码和主机,但没有数据库。
在这段代码中:
if __name__ == '__main__':
engine.execute('DROP DATABASE IF EXISTS satest')
# create and use the DB
engine.execute('CREATE DATABASE IF NOT EXISTS satest')
engine.execute('USE satest')
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
... 每次启动应用程序时首先删除整个数据库,重新创建它,然后手动发出 USE satest
查询。问题是,USE satest
命令仅适用于从池中签出的单个连接。只要池 returns 你有相同的连接,它就会一直存在,但是一旦创建了新连接,它就没有向它发出 USE satest
命令。
我的建议是创建一次数据库并将其留在那里。然后,将上面的代码更改为:
...
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest')
...
if __name__ == '__main__':
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
base.metadata.drop_all()
base.metadata.create_all()
如果你真的想继续做你正在做的事情,pymysql 连接接受一个 init_command
参数,根据他们的文档,该参数是:
Initial SQL statement to run when connection is established.
因此您可以将 "USE satest"
命令传递给该连接参数。您可以阅读如何做到这一点 here,但一个示例可能是:
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest', connect_args={"init_command": "USE satest"})
我显然似乎遗漏了一些东西:我希望 SQLAlchemy 通过在 'checkout' 重新创建它们来从陈旧的连接中恢复。
我的应用正在使用 SQLAlchemy ORM(严格来说是 ORM,根本不使用表达式语言)连接并与 MYSQL 数据库通信。当一段时间没有 activity 时,池中的连接似乎变得陈旧并生成
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected')
我试过设置 pre-ping 在 create_engine 中也有 pool_recycle。
我创建了一个示例来对此进行测试:请注意 pool_recycle 积极尝试更快地产生错误情况(在我的应用程序中,它是 3600)
# my test program in .py file
# - fully working code - BUT replace name/pwd/Dbserver in create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, event, exc
from sqlalchemy import MetaData, Column, Integer, DateTime
from datetime import datetime
from time import sleep
import pymysql
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
pool_pre_ping=True, pool_recycle=1)
Session = sessionmaker(bind=engine)
if __name__ == '__main__':
engine.execute('DROP DATABASE IF EXISTS satest')
# create and use the DB
engine.execute('CREATE DATABASE IF NOT EXISTS satest')
engine.execute('USE satest')
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
class TblFoo(base):
__tablename__ = 'Foo'
id = Column(Integer, primary_key=True, nullable=False)
counter = Column(Integer)
dateTime = Column(DateTime)
def createFooRecords():
count = 1
while count < 1000000:
realCount = count
try:
global Session
session = Session()
rec = TblFoo(counter=count,
dateTime=datetime.today().isoformat())
session.add(rec)
session.commit()
count += 1
finally:
session.close()
if realCount != count:
sleep(realCount*5)
@event.listens_for(engine, "handle_error")
def receive_handle_error(exception_context):
print('Handle Error: ', exception_context)
if isinstance(exception_context.original_exception,
pymysql.err.InternalError):
if str(exception_context.original_exception) == '1046':
print('Internal Error: ')
else:
print('Some other error: ',
exception_context.original_exception)
@event.listens_for(engine, 'invalidate')
def receive_invalidate(dbapi_connection, connection_record, exception):
print('DBAPI connection: ', dbapi_connection)
print('Conn Record: ', connection_record)
print('Exception: ', exception)
if __name__ == '__main__':
base.metadata.create_all(engine)
createFooRecords()
当然,engine.pool 中的连接在 1 秒后超时,第二次调用 session.add(rec) 失败。
"handle_error" 确实被调用了。
Handle Error: <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1093311d0>
Some other error: (1046, 'No database selected')
Traceback (most recent call last):
.... stacktrace omitted for brevity, finally ....
ssqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:07:45.184456'}] (Background on this error at: http://sqlalche.me/e/2j85)
"invalidate" 永远不会被调用。
那么为什么 SQLAlchemy 不重新创建连接,因为它很清楚它会在 1 秒内过期(pool_recycle 值)。 (不清楚它是否正在执行预 ping - 我如何验证? - 但如果它发生了,它仍然没有重新创建连接。)
诚然,错误来自 PyMysql,但设置在 SQLAlchemy 中。
当然,我想要这个可恢复的,所以我在 try-catch 中添加了一个例外:但也许重新创建引擎不是正确的方法。
def createFooRecords():
count = 1
while count < 1000000:
realCount = count
try:
global Session
session = Session()
rec = TblFoo(counter=count,
dateTime=datetime.today().isoformat())
session.add(rec)
session.commit()
count += 1
except exc.InternalError as e:
print('{} Exception: {}'.format(count, e))
global engine
engine.dispose()
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1',
pool_pre_ping=True, pool_recycle=1)
Session = sessionmaker(bind=engine)
continue
except Exception as e2:
print('Error', e2)
raise
finally:
session.close()
if realCount != count:
sleep(realCount*5)
Handle Error: <sqlalchemy.engine.base.ExceptionContextImpl object at 0x1076b6fd0>
Some other error: (1046, 'No database selected')
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.498157'}] (Background on this error at: http://sqlalche.me/e/2j85)
2 Exception: (pymysql.err.InternalError) (1046, 'No database selected') [SQL: 'INSERT INTO `Foo` (counter, `dateTime`) VALUES (%(counter)s, %(dateTime)s)'] [parameters: {'counter': 2, 'dateTime': '2019-08-28T15:09:47.503714'}] (Background on this error at: http://sqlalche.me/e/2j85)
...以及当 'count' 永远不会递增时人们所期望的无数这样的流。
那么网络网络,如何从失效连接中恢复(最好在结帐时)。我有一个相当复杂的应用程序,它只使用 ORM,并且有很多数据库(阅读:ORM Table)读写分散在整个过程中,所以即使是其他建议,如 lambda 或 运行 查询的中心函数,等不是很实用(或者应该是?)欢迎提出建议。
您遇到的问题是因为您没有在连接中定义数据库名称 url。记录的模式是 dialect+driver://username:password@host:port/database
,您提供 'mysql+pymysql://name:pwd@127.0.0.1'
。所以你提供了方言、驱动程序、用户名、密码和主机,但没有数据库。
在这段代码中:
if __name__ == '__main__':
engine.execute('DROP DATABASE IF EXISTS satest')
# create and use the DB
engine.execute('CREATE DATABASE IF NOT EXISTS satest')
engine.execute('USE satest')
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
... 每次启动应用程序时首先删除整个数据库,重新创建它,然后手动发出 USE satest
查询。问题是,USE satest
命令仅适用于从池中签出的单个连接。只要池 returns 你有相同的连接,它就会一直存在,但是一旦创建了新连接,它就没有向它发出 USE satest
命令。
我的建议是创建一次数据库并将其留在那里。然后,将上面的代码更改为:
...
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest')
...
if __name__ == '__main__':
# Prepare for creating tables
metadata = MetaData(engine)
# session = scoped_session(sessionmaker(bind=engine))
base = declarative_base()
base.metadata.drop_all()
base.metadata.create_all()
如果你真的想继续做你正在做的事情,pymysql 连接接受一个 init_command
参数,根据他们的文档,该参数是:
Initial SQL statement to run when connection is established.
因此您可以将 "USE satest"
命令传递给该连接参数。您可以阅读如何做到这一点 here,但一个示例可能是:
engine = create_engine('mysql+pymysql://name:pwd@127.0.0.1/satest', connect_args={"init_command": "USE satest"})