具有多个数据库连接的 lambda 上的 SqlAlchemy 和 pyMysql 连接池
SqlAlchemy & pyMysql connection pooling on a lambda with multiple DB connections
所以问题是我有多个数据库,我想在 SqlAlchemy 中使用同一个数据库池。它驻留在 Lambda 上,池是在 Lambda 启动时创建的。我希望后续的数据库连接使用现有的池。
工作正常的是初始池连接 bpConnect
以及对该连接的任何后续查询。
不起作用的是companyConnect
连接。我收到以下错误:
sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'
我有这些联系方式:
# Pooling
import sqlalchemy.pool as pool
#################### Engines ###################################################
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
engine = create_engine(engine_endpoint, echo_pool=True)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.StaticPool(bpGetConnection)
def companyGetConnection(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
compEngine = create_engine(engine_endpoint, pool=bpPool)
session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
db = Session()
return db
#################### POOLING #############################################
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
conn = companyGetConnection(database)
return conn
#################################################################
他们在这个例子中被调用:
from connections import companyConnect, bpConnect
from models import Company, Customers
def getCustomers(companyID):
db = bpConnect()
myQuery = db.query(Company).filter(Company.id == companyID).one()
compDB = companyConnect(myQuery.database)
customers = compDB.query(Customers).all()
return customers
我想出了如何在 lambda 上使用动态池来做到这一点:
class DBRegistry(object):
_db = {}
def get(self, url, **kwargs):
if url not in self._db:
engine = create_engine(url, **kwargs)
Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
session = scoped_session(Session)
self._db[url] = session
return self._db[url]
compDB = DBRegistry()
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
engine = create_engine(engine_endpoint)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
conn = compDB.get(engine_endpoint, poolclass=QueuePool)
return conn
所以基本上它将使用一个池用于主数据库上所需的持续连接,并使用另一个池动态更改所需的数据库。当需要连接到这些公司数据库之一时,它将检查该池是否已存在于池注册表中。如果池不存在,它将创建一个并注册它。
所以问题是我有多个数据库,我想在 SqlAlchemy 中使用同一个数据库池。它驻留在 Lambda 上,池是在 Lambda 启动时创建的。我希望后续的数据库连接使用现有的池。
工作正常的是初始池连接 bpConnect
以及对该连接的任何后续查询。
不起作用的是companyConnect
连接。我收到以下错误:
sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'
我有这些联系方式:
# Pooling
import sqlalchemy.pool as pool
#################### Engines ###################################################
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
engine = create_engine(engine_endpoint, echo_pool=True)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.StaticPool(bpGetConnection)
def companyGetConnection(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
compEngine = create_engine(engine_endpoint, pool=bpPool)
session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
db = Session()
return db
#################### POOLING #############################################
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
conn = companyGetConnection(database)
return conn
#################################################################
他们在这个例子中被调用:
from connections import companyConnect, bpConnect
from models import Company, Customers
def getCustomers(companyID):
db = bpConnect()
myQuery = db.query(Company).filter(Company.id == companyID).one()
compDB = companyConnect(myQuery.database)
customers = compDB.query(Customers).all()
return customers
我想出了如何在 lambda 上使用动态池来做到这一点:
class DBRegistry(object):
_db = {}
def get(self, url, **kwargs):
if url not in self._db:
engine = create_engine(url, **kwargs)
Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
session = scoped_session(Session)
self._db[url] = session
return self._db[url]
compDB = DBRegistry()
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
engine = create_engine(engine_endpoint)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
conn = compDB.get(engine_endpoint, poolclass=QueuePool)
return conn
所以基本上它将使用一个池用于主数据库上所需的持续连接,并使用另一个池动态更改所需的数据库。当需要连接到这些公司数据库之一时,它将检查该池是否已存在于池注册表中。如果池不存在,它将创建一个并注册它。