如何在 peewee-orm 中使用 ThreadPoolExecutor 处理数据库连接池?
How to handle database connectionpool with ThreadPoolExecutor in peewee-orm?
我在脚本中使用 peewee 和 mariadb 数据库,该脚本查看数据库 table 并将来自该查询的数据提交到 ThreadPoolExecutor。工作人员本身也需要查询数据库。所有期货完成后,脚本再次从头开始。
当我使用单个连接时,一切正常,但由于我的工作者工作主要是网络 IO,我觉得所有工作线程的单个连接将成为瓶颈。
如果我更改为数据库池,我可以监视连接的增加,直到我从 peewee 收到错误“太多打开的连接”。连接永远不会自行关闭。这适合 peewee 的 documentation
但是我不知道如何从我的工作函数内部手动打开和关闭数据库连接。
我尝试将 models.py 中的数据库变量设置为全局变量,然后我可以在我的 worker 中访问该对象,但是观察我的数据库上所有打开的连接让我意识到 .close()/.open() 没有在这种情况下的效果。
我也将所有内容粘贴到一个文件中,我仍然无法手动 open/close 连接。
documentation 仅给出了如何使用具有不同 webframeworks 的池的示例。
我的应用得到简化
#app.py
from models.models import MyTableA, MyTableB
def do_work(data):
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()
models.py
from peewee import *
from playhouse.pool import PooledMySQLDatabase
#uncomment this line to use pool
#database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
#comment that line to use pool
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass'})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
如果有人知道如何将 peewee 的连接池与 Threadpoolexecutor 一起使用,我将不胜感激。
我找到了解决办法here
models.py
from peewee import *
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = None
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
app.py
from playhouse.pool import PooledMySQLDatabase
from models.models import MyTableA, MyTableB
def do_work(data, db):
with db:
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
database.bind(
[
MyTableA, MyTableB
]
)
with database:
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()
我在脚本中使用 peewee 和 mariadb 数据库,该脚本查看数据库 table 并将来自该查询的数据提交到 ThreadPoolExecutor。工作人员本身也需要查询数据库。所有期货完成后,脚本再次从头开始。
当我使用单个连接时,一切正常,但由于我的工作者工作主要是网络 IO,我觉得所有工作线程的单个连接将成为瓶颈。
如果我更改为数据库池,我可以监视连接的增加,直到我从 peewee 收到错误“太多打开的连接”。连接永远不会自行关闭。这适合 peewee 的 documentation
但是我不知道如何从我的工作函数内部手动打开和关闭数据库连接。
我尝试将 models.py 中的数据库变量设置为全局变量,然后我可以在我的 worker 中访问该对象,但是观察我的数据库上所有打开的连接让我意识到 .close()/.open() 没有在这种情况下的效果。
我也将所有内容粘贴到一个文件中,我仍然无法手动 open/close 连接。
documentation 仅给出了如何使用具有不同 webframeworks 的池的示例。
我的应用得到简化
#app.py
from models.models import MyTableA, MyTableB
def do_work(data):
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()
models.py
from peewee import *
from playhouse.pool import PooledMySQLDatabase
#uncomment this line to use pool
#database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
#comment that line to use pool
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass'})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
如果有人知道如何将 peewee 的连接池与 Threadpoolexecutor 一起使用,我将不胜感激。
我找到了解决办法here
models.py
from peewee import *
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = None
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
app.py
from playhouse.pool import PooledMySQLDatabase
from models.models import MyTableA, MyTableB
def do_work(data, db):
with db:
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
database.bind(
[
MyTableA, MyTableB
]
)
with database:
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()