mysql 从芹菜执行插入时命令不同步
mysql command out of sync when executing insert from celery
我 运行 在使用自定义数据库库和芹菜时遇到可怕的 MySQL 命令不同步。
图书馆如下:
import pymysql
import pymysql.cursors
from furl import furl
from flask import current_app
class LegacyDB:
"""Db
Legacy Database connectivity library
"""
def __init__(self,app):
with app.app_context():
self.rc = current_app.config['RAVEN']
self.logger = current_app.logger
self.data = {}
# setup Mysql
try:
uri = furl(current_app.config['DBCX'])
self.dbcx = pymysql.connect(
host=uri.host,
user=uri.username,
passwd=uri.password,
db=str(uri.path.segments[0]),
port=int(uri.port),
cursorclass=pymysql.cursors.DictCursor
)
except:
self.rc.captureException()
def query(self, sql, params = None, TTL=36):
# INPUT 1 : SQL query
# INPUT 2 : Parameters
# INPUT 3 : Time To Live
# OUTPUT : Array of result
# check that we're still connected to the
# database before we fire off the query
try:
db_cursor = self.dbcx.cursor()
if params:
self.logger.debug("%s : %s" % (sql, params))
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
self.logger.debug("%s" % sql)
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
if self.data == None:
self.data = {}
db_cursor.close()
except Exception as ex:
if ex[0] == "2006":
db_cursor.close()
self.connect()
db_cursor = self.dbcx.cursor()
if params:
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
db_cursor.close()
else:
self.rc.captureException()
return self.data
该库的目的是在我将遗留数据库模式从基于 C++ 的系统迁移到基于 Python 的系统时与 SQLAlchemy 一起工作。
所有配置都是通过 Flask 应用程序完成的,app.config['DBCX'] 值读取与 SQLAlchemy 字符串相同 ("mysql://user:pass@host:port/dbname") 允许我在未来轻松切换.
我有许多任务 运行 "INSERT" 通过 celery 声明,所有这些任务都使用这个库。你可以想象,运行ning Celery 的主要原因是我可以增加这个应用程序的吞吐量,但是我似乎在一段时间后(大约500 条已处理的消息)我在日志中看到以下内容:
Stacktrace (most recent call last):
File "legacy/legacydb.py", line 49, in query
self.dbcx.commit()
File "pymysql/connections.py", line 662, in commit
self._read_ok_packet()
File "pymysql/connections.py", line 643, in _read_ok_packet
raise OperationalError(2014, "Command Out of Sync")
我显然做错了什么来解决这个错误,但是 MySQL 是否有自动提交 enabled/disabled 或者我把 connection.commit() 放在哪里似乎并不重要打电话。
如果我遗漏了 connection.commit() 那么我就不会将任何内容插入到数据库中。
我最近从 mysqldb 转移到 pymysql 并且出现次数似乎较低,但是考虑到这些是简单的 "insert" 命令而不是复杂的 select (甚至没有任何此数据库的外键约束!)我正在努力找出问题所在。
就目前情况而言,我无法使用 executemany,因为我无法提前准备语句(我从 "firehose" 消息队列中提取数据并将其存储在本地以供以后处理)。
首先,确保celery
thingamajig 使用自己的连接,因为
>>> pymysql.threadsafety
1
init 是调用一次,还是每个 worker 调用一次?如果只有一次,你需要移动初始化。
第一次调用查询时,在线程局部变量中延迟初始化连接怎么样?
我 运行 在使用自定义数据库库和芹菜时遇到可怕的 MySQL 命令不同步。
图书馆如下:
import pymysql
import pymysql.cursors
from furl import furl
from flask import current_app
class LegacyDB:
"""Db
Legacy Database connectivity library
"""
def __init__(self,app):
with app.app_context():
self.rc = current_app.config['RAVEN']
self.logger = current_app.logger
self.data = {}
# setup Mysql
try:
uri = furl(current_app.config['DBCX'])
self.dbcx = pymysql.connect(
host=uri.host,
user=uri.username,
passwd=uri.password,
db=str(uri.path.segments[0]),
port=int(uri.port),
cursorclass=pymysql.cursors.DictCursor
)
except:
self.rc.captureException()
def query(self, sql, params = None, TTL=36):
# INPUT 1 : SQL query
# INPUT 2 : Parameters
# INPUT 3 : Time To Live
# OUTPUT : Array of result
# check that we're still connected to the
# database before we fire off the query
try:
db_cursor = self.dbcx.cursor()
if params:
self.logger.debug("%s : %s" % (sql, params))
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
self.logger.debug("%s" % sql)
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
if self.data == None:
self.data = {}
db_cursor.close()
except Exception as ex:
if ex[0] == "2006":
db_cursor.close()
self.connect()
db_cursor = self.dbcx.cursor()
if params:
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
db_cursor.close()
else:
self.rc.captureException()
return self.data
该库的目的是在我将遗留数据库模式从基于 C++ 的系统迁移到基于 Python 的系统时与 SQLAlchemy 一起工作。
所有配置都是通过 Flask 应用程序完成的,app.config['DBCX'] 值读取与 SQLAlchemy 字符串相同 ("mysql://user:pass@host:port/dbname") 允许我在未来轻松切换.
我有许多任务 运行 "INSERT" 通过 celery 声明,所有这些任务都使用这个库。你可以想象,运行ning Celery 的主要原因是我可以增加这个应用程序的吞吐量,但是我似乎在一段时间后(大约500 条已处理的消息)我在日志中看到以下内容:
Stacktrace (most recent call last):
File "legacy/legacydb.py", line 49, in query
self.dbcx.commit()
File "pymysql/connections.py", line 662, in commit
self._read_ok_packet()
File "pymysql/connections.py", line 643, in _read_ok_packet
raise OperationalError(2014, "Command Out of Sync")
我显然做错了什么来解决这个错误,但是 MySQL 是否有自动提交 enabled/disabled 或者我把 connection.commit() 放在哪里似乎并不重要打电话。
如果我遗漏了 connection.commit() 那么我就不会将任何内容插入到数据库中。
我最近从 mysqldb 转移到 pymysql 并且出现次数似乎较低,但是考虑到这些是简单的 "insert" 命令而不是复杂的 select (甚至没有任何此数据库的外键约束!)我正在努力找出问题所在。
就目前情况而言,我无法使用 executemany,因为我无法提前准备语句(我从 "firehose" 消息队列中提取数据并将其存储在本地以供以后处理)。
首先,确保celery
thingamajig 使用自己的连接,因为
>>> pymysql.threadsafety
1
init 是调用一次,还是每个 worker 调用一次?如果只有一次,你需要移动初始化。
第一次调用查询时,在线程局部变量中延迟初始化连接怎么样?