在 Flask 中使用 pymysql 时出错
Error while using pymysql in flask
我正在使用 pymysql 客户端连接到我的烧瓶 API 中的 mysql,
几天(大约 1-2 天)一切正常,然后突然开始抛出此错误
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1039, in _write_bytes
self._sock.sendall(data)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "Main.py", line 194, in post
result={'resultCode':100,'resultDescription':'SUCCESS','result':self.getStudentATData(studentId,args['chapterId'])}
File "Main.py", line 176, in getStudentATData
cur.execute("my query")
File "/usr/local/lib/python3.4/dist-packages/pymysql/cursors.py", line 166, in execute
result = self._query(query)
File "/usr/local/lib/python3.4/dist-packages/pymysql/cursors.py", line 322, in _query
conn.query(q)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 855, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1092, in _execute_command
self._write_bytes(packet)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1044, in _write_bytes
"MySQL server has gone away (%r)" % (e,))
pymysql.err.OperationalError: (2006, "MySQL server has gone away (TimeoutError(110, 'Connection timed out'))")
如果重新启动应用程序它再次工作正常,我已经尝试了一切但似乎无法摆脱这个,任何人都可以帮忙吗?
按照建议,我实施了一种重试机制,但这并没有解决问题
def connect(self):
#db connect here
def cursor(self):
try:
cursor = self.conn.cursor()
except Exception as e:
print(e)
self.connect()
cursor = self.conn.cursor()
return cursor
像 DB().cursor() 一样使用它
我不认为这是 Flask/pymysql 的问题,因为这是您的 MySQL 超时配置的症状。我假设这是某种云数据库实例?
看看这个:
https://dba.stackexchange.com/questions/1558/how-long-is-too-long-for-mysql-connections-to-sleep
我会 post 你的问题和你的设置细节,你可能会得到配置答案。
Python 解决方案是使用 sqlalchemy 和 flask-sqlalchemy 之类的东西,然后设置配置变量 SQLALCHEMY_POOL_RECYCLE = 3600
以在一小时后(或您想要的任何值)回收连接。或者,如果您不想在项目中添加那么多内容,您可以实现连接 "timer" 功能以在后台自行回收连接:
from datetime import datetime, timedelta
class MyConnectionPool(object)
"""Class that returns a database connection <= 1 hour old"""
refresh_time = timedelta(hours=1)
def __init__(self, host, user, pass):
self.host = host
self.user = user
self.pass = pass
self.db = self._get_connection()
@property
def connection(self):
if self.refresh <= datetime.now():
self.db = self._get_connection()
return self.db
def _get_connection(self):
self.refresh = datetime.now() + self.refresh_time
return pymysql.connect(
host=self.host,
user=self.user,
passwd=self.pass
)
您是否尝试过 dB ping,如果在每次通话前重新连接失败?
我在 flask 中发现的另一件事是,如果我在每次调用后不关闭连接,我最终会遇到这样的情况。
抱歉,缺少详细信息,但我是在 phone 上输入的,并且滚动浏览了您的所有代码,这很有效:-)
class MyDatabase():
def __init__(self, host, user, passwd, db, charset):
self.host = host
self.usr = user
self.passwd = passwd
self.db = db
self.curclass = pymysql.cursors.DictCursor
self.charset = charset
self.connection = pymysql.connect(host=self.host, user=self.usr, passwd=self.passwd, db=self.db,
cursorclass=self.curclass, charset=self.charset)
def get_keywords(self):
self.connection.connect()
cur = self.connection.cursor()
sql = """
SELECT * FROM input_keywords
"""
rows = None
try:
cur.execute(sql)
rows = cur.fetchall()
except Exception as e:
print(e)
self.connection.rollback()
finally:
cur.close()
self.connection.commit()
self.connection.close()
return rows
这会让 Flask 为每个请求创建一个连接,并在最后关闭它。
所以我调用的任何方法都使用这种模式。这也允许多个请求等(网站这样做)
我并不是说它是完美的,但是对于对数据库的每个请求,您都在创建和关闭数据库连接,因此它永远不会超时。
这是非常基本的,同样,您可以将它与 ping() 结合使用并允许它创建新连接等
据我所知,您有两个选择:
为每个查询创建新连接,然后关闭它。像这样:
def db_execute(query):
conn = MySQLdb.connect(*)
cur = conn.cursor()
cur.execute(query)
res = cur.fetchall()
cur.close()
conn.close()
return res
- 更好的方法是像 SqlAlchemy.pool 那样使用带有 pool_pre_ping 参数和自定义连接函数的连接池。
首先,您需要决定是否要保持与MySQL的持久连接。后者性能更好,但需要一点维护。
MySQL 中的默认 wait_timeout
是 8 小时。每当连接空闲时间超过 wait_timeout
时,它就会关闭。当 MySQL 服务器重新启动时,它还会关闭所有已建立的连接。因此,如果您使用持久连接,则需要在使用连接之前检查它是否存在(如果不存在,则重新连接)。如果使用 per request 连接,则不需要维护连接状态,因为连接总是新鲜的。
每个请求连接
对于每个传入的 HTTP 请求,非持久性数据库连接具有明显的打开连接、握手等(对于数据库服务器和客户端)的开销。
引用Flask官方教程regarding database connections:
Creating and closing database connections all the time is very inefficient, so you will need to keep it around for longer. Because database connections encapsulate a transaction, you will need to make sure that only one request at a time uses the connection. An elegant way to do this is by utilizing the application context.
但是请注意,应用程序上下文 是根据请求初始化的(这有点被效率问题和 Flask 的行话掩盖了)。因此,它仍然非常低效。但是它应该可以解决您的问题。这是应用于 pymysql
:
的建议片段
import pymysql
from flask import Flask, g, request
app = Flask(__name__)
def connect_db():
return pymysql.connect(
user = 'guest', password = '', database = 'sakila',
autocommit = True, charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor)
def get_db():
'''Opens a new database connection per request.'''
if not hasattr(g, 'db'):
g.db = connect_db()
return g.db
@app.teardown_appcontext
def close_db(error):
'''Closes the database connection at the end of request.'''
if hasattr(g, 'db'):
g.db.close()
@app.route('/')
def hello_world():
city = request.args.get('city')
cursor = get_db().cursor()
cursor.execute('SELECT city_id FROM city WHERE city = %s', city)
row = cursor.fetchone()
if row:
return 'City "{}" is #{:d}'.format(city, row['city_id'])
else:
return 'City "{}" not found'.format(city)
持久连接
对于持久连接数据库连接,有两个主要选项。您有一个连接池或将连接映射到工作进程。因为通常 Flask WSGI 应用程序由具有固定线程数的线程服务器(例如 uWSGI)提供服务,线程映射更容易且高效。
有一个包,DBUtils, which implements both, and PersistentDB
用于线程映射连接。
维护持久连接的一个重要注意事项是 t运行sactions。重新连接的 API 是 ping
. It's safe for auto-committing single-statements, but it can be disrupting in between a transaction (a little more details here)。 DBUtils 会处理这个问题,并且应该只在 dbapi.OperationalError
和 dbapi.InternalError
上重新连接(默认情况下,由 failures
控制到 PersistentDB
的初始化程序)在 t[=91= 之外引发]行动。
以上代码段在 PersistentDB
中的效果如下。
import pymysql
from flask import Flask, g, request
from DBUtils.PersistentDB import PersistentDB
app = Flask(__name__)
def connect_db():
return PersistentDB(
creator = pymysql, # the rest keyword arguments belong to pymysql
user = 'guest', password = '', database = 'sakila',
autocommit = True, charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor)
def get_db():
'''Opens a new database connection per app.'''
if not hasattr(app, 'db'):
app.db = connect_db()
return app.db.connection()
@app.route('/')
def hello_world():
city = request.args.get('city')
cursor = get_db().cursor()
cursor.execute('SELECT city_id FROM city WHERE city = %s', city)
row = cursor.fetchone()
if row:
return 'City "{}" is #{:d}'.format(city, row['city_id'])
else:
return 'City "{}" not found'.format(city)
微基准测试
为了从数字上给出一些性能影响的线索,这里是微基准。
我运行:
uwsgi --http :5000 --wsgi-file app_persistent.py --callable app --master --processes 1 --threads 16
uwsgi --http :5000 --wsgi-file app_per_req.py --callable app --master --processes 1 --threads 16
并通过并发 1、4、8、16 对它们进行负载测试:
siege -b -t 15s -c 16 http://localhost:5000/?city=london
观察结果(针对我的本地配置):
- 持久连接快约 30%,
- 在并发性 4 和更高版本上,uWSGI 工作进程的峰值超过 CPU 利用率的 100%(
pymysql
必须解析 MySQL 纯 Python 协议,这是瓶颈),
- 在并发 16 时,
mysqld
的 CPU 利用率对于每个请求约为 55%,对于持久连接约为 45%。
我正在使用 pymysql 客户端连接到我的烧瓶 API 中的 mysql, 几天(大约 1-2 天)一切正常,然后突然开始抛出此错误
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1039, in _write_bytes
self._sock.sendall(data)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "Main.py", line 194, in post
result={'resultCode':100,'resultDescription':'SUCCESS','result':self.getStudentATData(studentId,args['chapterId'])}
File "Main.py", line 176, in getStudentATData
cur.execute("my query")
File "/usr/local/lib/python3.4/dist-packages/pymysql/cursors.py", line 166, in execute
result = self._query(query)
File "/usr/local/lib/python3.4/dist-packages/pymysql/cursors.py", line 322, in _query
conn.query(q)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 855, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1092, in _execute_command
self._write_bytes(packet)
File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1044, in _write_bytes
"MySQL server has gone away (%r)" % (e,))
pymysql.err.OperationalError: (2006, "MySQL server has gone away (TimeoutError(110, 'Connection timed out'))")
如果重新启动应用程序它再次工作正常,我已经尝试了一切但似乎无法摆脱这个,任何人都可以帮忙吗? 按照建议,我实施了一种重试机制,但这并没有解决问题
def connect(self):
#db connect here
def cursor(self):
try:
cursor = self.conn.cursor()
except Exception as e:
print(e)
self.connect()
cursor = self.conn.cursor()
return cursor
像 DB().cursor() 一样使用它
我不认为这是 Flask/pymysql 的问题,因为这是您的 MySQL 超时配置的症状。我假设这是某种云数据库实例?
看看这个:
https://dba.stackexchange.com/questions/1558/how-long-is-too-long-for-mysql-connections-to-sleep
我会 post 你的问题和你的设置细节,你可能会得到配置答案。
Python 解决方案是使用 sqlalchemy 和 flask-sqlalchemy 之类的东西,然后设置配置变量 SQLALCHEMY_POOL_RECYCLE = 3600
以在一小时后(或您想要的任何值)回收连接。或者,如果您不想在项目中添加那么多内容,您可以实现连接 "timer" 功能以在后台自行回收连接:
from datetime import datetime, timedelta
class MyConnectionPool(object)
"""Class that returns a database connection <= 1 hour old"""
refresh_time = timedelta(hours=1)
def __init__(self, host, user, pass):
self.host = host
self.user = user
self.pass = pass
self.db = self._get_connection()
@property
def connection(self):
if self.refresh <= datetime.now():
self.db = self._get_connection()
return self.db
def _get_connection(self):
self.refresh = datetime.now() + self.refresh_time
return pymysql.connect(
host=self.host,
user=self.user,
passwd=self.pass
)
您是否尝试过 dB ping,如果在每次通话前重新连接失败? 我在 flask 中发现的另一件事是,如果我在每次调用后不关闭连接,我最终会遇到这样的情况。
抱歉,缺少详细信息,但我是在 phone 上输入的,并且滚动浏览了您的所有代码,这很有效:-)
class MyDatabase():
def __init__(self, host, user, passwd, db, charset):
self.host = host
self.usr = user
self.passwd = passwd
self.db = db
self.curclass = pymysql.cursors.DictCursor
self.charset = charset
self.connection = pymysql.connect(host=self.host, user=self.usr, passwd=self.passwd, db=self.db,
cursorclass=self.curclass, charset=self.charset)
def get_keywords(self):
self.connection.connect()
cur = self.connection.cursor()
sql = """
SELECT * FROM input_keywords
"""
rows = None
try:
cur.execute(sql)
rows = cur.fetchall()
except Exception as e:
print(e)
self.connection.rollback()
finally:
cur.close()
self.connection.commit()
self.connection.close()
return rows
这会让 Flask 为每个请求创建一个连接,并在最后关闭它。
所以我调用的任何方法都使用这种模式。这也允许多个请求等(网站这样做)
我并不是说它是完美的,但是对于对数据库的每个请求,您都在创建和关闭数据库连接,因此它永远不会超时。
这是非常基本的,同样,您可以将它与 ping() 结合使用并允许它创建新连接等
据我所知,您有两个选择:
为每个查询创建新连接,然后关闭它。像这样:
def db_execute(query): conn = MySQLdb.connect(*) cur = conn.cursor() cur.execute(query) res = cur.fetchall() cur.close() conn.close() return res
- 更好的方法是像 SqlAlchemy.pool 那样使用带有 pool_pre_ping 参数和自定义连接函数的连接池。
首先,您需要决定是否要保持与MySQL的持久连接。后者性能更好,但需要一点维护。
MySQL 中的默认 wait_timeout
是 8 小时。每当连接空闲时间超过 wait_timeout
时,它就会关闭。当 MySQL 服务器重新启动时,它还会关闭所有已建立的连接。因此,如果您使用持久连接,则需要在使用连接之前检查它是否存在(如果不存在,则重新连接)。如果使用 per request 连接,则不需要维护连接状态,因为连接总是新鲜的。
每个请求连接
对于每个传入的 HTTP 请求,非持久性数据库连接具有明显的打开连接、握手等(对于数据库服务器和客户端)的开销。
引用Flask官方教程regarding database connections:
Creating and closing database connections all the time is very inefficient, so you will need to keep it around for longer. Because database connections encapsulate a transaction, you will need to make sure that only one request at a time uses the connection. An elegant way to do this is by utilizing the application context.
但是请注意,应用程序上下文 是根据请求初始化的(这有点被效率问题和 Flask 的行话掩盖了)。因此,它仍然非常低效。但是它应该可以解决您的问题。这是应用于 pymysql
:
import pymysql
from flask import Flask, g, request
app = Flask(__name__)
def connect_db():
return pymysql.connect(
user = 'guest', password = '', database = 'sakila',
autocommit = True, charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor)
def get_db():
'''Opens a new database connection per request.'''
if not hasattr(g, 'db'):
g.db = connect_db()
return g.db
@app.teardown_appcontext
def close_db(error):
'''Closes the database connection at the end of request.'''
if hasattr(g, 'db'):
g.db.close()
@app.route('/')
def hello_world():
city = request.args.get('city')
cursor = get_db().cursor()
cursor.execute('SELECT city_id FROM city WHERE city = %s', city)
row = cursor.fetchone()
if row:
return 'City "{}" is #{:d}'.format(city, row['city_id'])
else:
return 'City "{}" not found'.format(city)
持久连接
对于持久连接数据库连接,有两个主要选项。您有一个连接池或将连接映射到工作进程。因为通常 Flask WSGI 应用程序由具有固定线程数的线程服务器(例如 uWSGI)提供服务,线程映射更容易且高效。
有一个包,DBUtils, which implements both, and PersistentDB
用于线程映射连接。
维护持久连接的一个重要注意事项是 t运行sactions。重新连接的 API 是 ping
. It's safe for auto-committing single-statements, but it can be disrupting in between a transaction (a little more details here)。 DBUtils 会处理这个问题,并且应该只在 dbapi.OperationalError
和 dbapi.InternalError
上重新连接(默认情况下,由 failures
控制到 PersistentDB
的初始化程序)在 t[=91= 之外引发]行动。
以上代码段在 PersistentDB
中的效果如下。
import pymysql
from flask import Flask, g, request
from DBUtils.PersistentDB import PersistentDB
app = Flask(__name__)
def connect_db():
return PersistentDB(
creator = pymysql, # the rest keyword arguments belong to pymysql
user = 'guest', password = '', database = 'sakila',
autocommit = True, charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor)
def get_db():
'''Opens a new database connection per app.'''
if not hasattr(app, 'db'):
app.db = connect_db()
return app.db.connection()
@app.route('/')
def hello_world():
city = request.args.get('city')
cursor = get_db().cursor()
cursor.execute('SELECT city_id FROM city WHERE city = %s', city)
row = cursor.fetchone()
if row:
return 'City "{}" is #{:d}'.format(city, row['city_id'])
else:
return 'City "{}" not found'.format(city)
微基准测试
为了从数字上给出一些性能影响的线索,这里是微基准。
我运行:
uwsgi --http :5000 --wsgi-file app_persistent.py --callable app --master --processes 1 --threads 16
uwsgi --http :5000 --wsgi-file app_per_req.py --callable app --master --processes 1 --threads 16
并通过并发 1、4、8、16 对它们进行负载测试:
siege -b -t 15s -c 16 http://localhost:5000/?city=london
观察结果(针对我的本地配置):
- 持久连接快约 30%,
- 在并发性 4 和更高版本上,uWSGI 工作进程的峰值超过 CPU 利用率的 100%(
pymysql
必须解析 MySQL 纯 Python 协议,这是瓶颈), - 在并发 16 时,
mysqld
的 CPU 利用率对于每个请求约为 55%,对于持久连接约为 45%。