SQLAlchemy - 多个数据库问题
SQLAlchemy - Multiple Database Issues
我们正在使用 SQLAlchemy 制作游戏服务器。
因为游戏服务器必须非常快,所以我们决定根据用户ID(整数)分离数据库。
例如,我像下面这样成功地做到了。
from threading import Thread
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base, DeferredReflection
from sqlalchemy.orm import sessionmaker
DeferredBase = declarative_base(cls=DeferredReflection)
class BuddyModel(DeferredBase):
__tablename__ = 'test_x'
id = Column(Integer(), primary_key=True, autoincrement=True)
value = Column(String(50), nullable=False)
下一个代码将创建多个数据库。
会有test1~test10个数据库。
for i in range(10):
url = 'mysql://user@localhost/'
engine = create_engine(url, encoding='UTF-8', pool_recycle=300)
con = engine.connect()
con.execute('create database test%d' % i)
以下代码将创建 10 个独立的引擎。
get_engine() 函数将根据用户 ID 为您提供引擎。
(用户ID为整数)
engines = []
for i in range(10):
url = 'mysql://user@localhost/test%d'% i
engine = create_engine(url, encoding='UTF-8', pool_recycle=300)
DeferredBase.metadata.bind = engine
DeferredBase.metadata.create_all()
engines.append(engine)
def get_engine(user_id):
index = user_id%10
return engines[index]
通过运行准备功能,BuddyModel class 将准备好,并映射到引擎。
def prepare(user_id):
engine = get_engine(user_id)
DeferredBase.prepare(engine)
** 下一段代码会做我想做的事情 **
for user_id in range(100):
prepare(user_id)
engine = get_engine(user_id)
session = sessionmaker(engine)()
buddy = BuddyModel()
buddy.value = 'user_id: %d' % user_id
session.add(buddy)
session.commit()
但问题是,当我在多线程中执行时,它只会引发错误
class MetalMultidatabaseThread(Thread):
def run(self):
for user_id in range(100):
prepare(user_id)
engine = get_engine(user_id)
session = sessionmaker(engine)()
buddy = BuddyModel()
buddy.value = 'user_id: %d' % user_id
session.add(buddy)
session.commit()
threads = []
for i in range(100):
t = MetalMultidatabaseThread()
t.setDaemon(True)
t.start()
threads.append(t)
for t in threads:
t.join()
错误信息是...
ArgumentError: Class '<class '__main__.BuddyModel'>' already has a primary mapper defined. Use non_primary=True to create a non primary Mapper. clear_mappers() will remove *all* current mappers from all classes.
所以..我的问题是我如何使用 SQLAlchemy 像上面的架构那样做多个数据库?
我找到了问题的答案。
要根据用户 ID(整数)建立多个数据库,只需使用会话。
在解释这个之前,我想多阐述一下数据库架构。
例如,如果用户 ID 114 连接到服务器,服务器将使用类似这样的方式来确定从何处检索用户信息。
user_id%10 # <-- 4th database
建筑
DATABASES
- DB0 <-- save all user data whose ID ends with 0
- DB1 <-- save all user data whose ID ends with 1
.
.
.
- DB8 <-- save all user data whose ID ends with 9
答案在这里
首先不要使用绑定参数..只需将其设为空即可。
Base = declarative_base()
声明模型..
class BuddyModel(Base):
__tablename__ = 'test_x'
id = Column(Integer(), primary_key=True, autoincrement=True)
value = Column(String(50), nullable=False)
当你想做增删改查时,创建一个会话
engine = get_engine_by_user_id(user_id)
session = sessionmaker(bind=engine)()
buddy = BuddyModel()
buddy.value = 'This is Sparta!! %d' % user_id
session.add(buddy)
session.commit()
引擎应该是与用户ID匹配的引擎。
这称为水平分片,是一个有点棘手的用例。您拥有的版本,基于首先获取引擎进行会话,可以正常工作。您可能会喜欢它的两种变体。
一种是使用horizontal sharding extension。此扩展允许您创建会话以自动 select 正确的节点。
另一个或多或少是你所拥有的,但不那么冗长。构建一个具有 routing function 的会话 class,这样您至少可以共享一个会话并说,session.using_bind('engine1')
用于查询而不是创建一个全新的会话。
我们正在使用 SQLAlchemy 制作游戏服务器。
因为游戏服务器必须非常快,所以我们决定根据用户ID(整数)分离数据库。
例如,我像下面这样成功地做到了。
from threading import Thread
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base, DeferredReflection
from sqlalchemy.orm import sessionmaker
DeferredBase = declarative_base(cls=DeferredReflection)
class BuddyModel(DeferredBase):
__tablename__ = 'test_x'
id = Column(Integer(), primary_key=True, autoincrement=True)
value = Column(String(50), nullable=False)
下一个代码将创建多个数据库。
会有test1~test10个数据库。
for i in range(10):
url = 'mysql://user@localhost/'
engine = create_engine(url, encoding='UTF-8', pool_recycle=300)
con = engine.connect()
con.execute('create database test%d' % i)
以下代码将创建 10 个独立的引擎。
get_engine() 函数将根据用户 ID 为您提供引擎。
(用户ID为整数)
engines = []
for i in range(10):
url = 'mysql://user@localhost/test%d'% i
engine = create_engine(url, encoding='UTF-8', pool_recycle=300)
DeferredBase.metadata.bind = engine
DeferredBase.metadata.create_all()
engines.append(engine)
def get_engine(user_id):
index = user_id%10
return engines[index]
通过运行准备功能,BuddyModel class 将准备好,并映射到引擎。
def prepare(user_id):
engine = get_engine(user_id)
DeferredBase.prepare(engine)
** 下一段代码会做我想做的事情 **
for user_id in range(100):
prepare(user_id)
engine = get_engine(user_id)
session = sessionmaker(engine)()
buddy = BuddyModel()
buddy.value = 'user_id: %d' % user_id
session.add(buddy)
session.commit()
但问题是,当我在多线程中执行时,它只会引发错误
class MetalMultidatabaseThread(Thread):
def run(self):
for user_id in range(100):
prepare(user_id)
engine = get_engine(user_id)
session = sessionmaker(engine)()
buddy = BuddyModel()
buddy.value = 'user_id: %d' % user_id
session.add(buddy)
session.commit()
threads = []
for i in range(100):
t = MetalMultidatabaseThread()
t.setDaemon(True)
t.start()
threads.append(t)
for t in threads:
t.join()
错误信息是...
ArgumentError: Class '<class '__main__.BuddyModel'>' already has a primary mapper defined. Use non_primary=True to create a non primary Mapper. clear_mappers() will remove *all* current mappers from all classes.
所以..我的问题是我如何使用 SQLAlchemy 像上面的架构那样做多个数据库?
我找到了问题的答案。
要根据用户 ID(整数)建立多个数据库,只需使用会话。
在解释这个之前,我想多阐述一下数据库架构。
例如,如果用户 ID 114 连接到服务器,服务器将使用类似这样的方式来确定从何处检索用户信息。
user_id%10 # <-- 4th database
建筑
DATABASES
- DB0 <-- save all user data whose ID ends with 0
- DB1 <-- save all user data whose ID ends with 1
.
.
.
- DB8 <-- save all user data whose ID ends with 9
答案在这里
首先不要使用绑定参数..只需将其设为空即可。
Base = declarative_base()
声明模型..
class BuddyModel(Base):
__tablename__ = 'test_x'
id = Column(Integer(), primary_key=True, autoincrement=True)
value = Column(String(50), nullable=False)
当你想做增删改查时,创建一个会话
engine = get_engine_by_user_id(user_id)
session = sessionmaker(bind=engine)()
buddy = BuddyModel()
buddy.value = 'This is Sparta!! %d' % user_id
session.add(buddy)
session.commit()
引擎应该是与用户ID匹配的引擎。
这称为水平分片,是一个有点棘手的用例。您拥有的版本,基于首先获取引擎进行会话,可以正常工作。您可能会喜欢它的两种变体。
一种是使用horizontal sharding extension。此扩展允许您创建会话以自动 select 正确的节点。
另一个或多或少是你所拥有的,但不那么冗长。构建一个具有 routing function 的会话 class,这样您至少可以共享一个会话并说,session.using_bind('engine1')
用于查询而不是创建一个全新的会话。