如何将 sqlalchemy 异步会话(Async Session)生成器设为 class-base?
how to make the sqlalchemy async session(Async Session) generator as class-base?
我在后台线程中有快速 API 应用程序和 运行 计划任务作为快速 API 中的启动事件。
所以当我在路由范围内使用 SQlAlchemy 异步会话时,例如:
session: AsyncSession=Depends(instance_manger.db_instance.get_db_session)
没问题,运行 是正确的,但是当它在后台线程中为 运行 时,出现以下错误。
我使用 python 模块 => SQLAlchemy[asyncio] asyncmy pymysql fastapi
database.py
class DBManager:
def __init__(self):
self.SQLALCHEMY_DATABASE_URL = None
self.config_reader_instance = None
self.engine = None
self._session_factory = None
self.logger_handler_instance = None
self.db = None
def initialize(self, config_reader_instance, logger_handler_instance):
self.logger_handler_instance = logger_handler_instance
self.config_reader_instance = config_reader_instance
self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
self.config_reader_instance.DB_INFO['db_name'])
self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# self.engine.begi/n()
self._session_factory = async_scoped_session(sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
# self._session_factory = orm.scoped_session(
# orm.sessionmaker(
# class_=AsyncSession,
# autoflush=False,
# bind=self.engine,
# ),
# )
async def get_db_session(self) -> AsyncSession:
async with self._session_factory() as session:
try:
yield session
except Exception as e:
self.logger_handler_instance.write_log(__name__, logging.FATAL,
'Session rollback because of exception')
self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
await session.rollback()
raise
finally:
await session.close()
background_thread.py
class BackgroundRunnable:
def __init__(self):
self.instance_manger = None
self.core_process_instance = None
self.conf_reader_instance = None
self.process_id = None
self.process_name = "BTC"
def initialize(self, instance_manager: InstanceManager):
self.instance_manger = instance_manager
return self
def set_process_info(self, process_name):
self.process_id = os.getpid()
self.process_name = process_name
async def run_main(self):
self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
"Background Thread is start")
results = await CryptoCoinService(
CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
print(results)
crypto_coin_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper
from db.models.models import CryptoCoinModel
class CryptoCoinRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_all(self) -> bool:
results = await self.session.execute(
select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
results_ = results.fetchone()
if results_.__len__() == 0:
return False
else:
return True
main.py
from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService
deposit_Router = APIRouter()
instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
print(results)
deposit_app = FastAPI()
@deposit_app.on_event('startup')
async def app_startup():
background_runnable = BackgroundRunnable()
background_runnable.initialize(instance_manager)
asyncio.create_task(background_runnable.run_main())
# asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
deposit_app.include_router(deposit_Router)
当我运行快速API应用程序错误时属于输出。
INFO: Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
INFO: Started reloader process [176] using watchgod
INFO: Started server process [179]
INFO: Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
results = await CryptoCoinService(
File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO: Application startup complete.
全部注意:当您像生成器一样使用 database.py 中的函数 get_db_session 时,会话的关闭功能不会像自动一样工作,因此您应该像手动一样关闭它们。如果您需要更多详细信息,请发送电子邮件,drr000t3r@gmail.com。祝你好运。
database.py
import logging
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
Base = declarative_base()
class DBManager:
def __init__(self):
self.SQLALCHEMY_DATABASE_URL = None
self.config_reader_instance = None
self.engine = None
self.session_factory = None
self.Base = declarative_base()
self.logger_handler_instance = None
def initialize(self, config_reader_instance, logger_handler_instance):
self.logger_handler_instance = logger_handler_instance
self.config_reader_instance = config_reader_instance
self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
self.config_reader_instance.DB_INFO['db_name'])
self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, pool_size=30,
max_overflow=30, echo_pool=True, echo=False,
pool_recycle=3600) # recycle every hour
DBManager.Base = declarative_base()
self.session_factory = scoped_session(sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False
))
def get_db_session(self):
session = self.session_factory()
try:
yield session
except Exception as e:
self.logger_handler_instance.log(__name__, logging.FATAL,
'Session rollback because of exception')
self.logger_handler_instance.log(__name__, logging.FATAL, e)
session.rollback()
raise
finally:
session.close()
async def init_models(self):
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
background_thread.py
class BackgroundRunnable:
def __init__(self):
self.instance_manger = None
self.core_process_instance = None
self.conf_reader_instance = None
self.process_id = None
self.process_name = "BTC"
def initialize(self, instance_manager: InstanceManager):
self.instance_manger = instance_manager
return self
def set_process_info(self, process_name):
self.process_id = os.getpid()
self.process_name = process_name
async def run_main(self):
self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
"Background Thread is start")
self.session: AsyncSession = next(self.instance_manger.db_instance.get_db_session())
results = await CryptoCoinService(CryptoCoinRepository(self.session)).get_coin(
self.instance_manger.config_reader_instance.BTC_INFO['BTC_COIN'])
print(results)
crypto_coin_repository.py
"""Repositories module."""
from contextlib import AbstractContextManager
from typing import Callable
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper, Session
from db.models.models import CryptoCoinModel
class CryptoCoinRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_all(self, coin) -> bool:
results = await self.session.execute(
select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == coin))
results = results.fetchall()
if len(results) == 0:
return False
else:
return True
def serialize(self, model):
"""Transforms a model into a dictionary which can be dumped to JSON."""
# first we get the names of all the columns on your model
columns = [c.key for c in class_mapper(model.__class__).columns]
# then we return their values in a dict
return dict((c, getattr(model, '_' + c)) for c in columns)
class NotFoundError(Exception):
symbol: str
def __init__(self):
super().__init__(f'{self._symobl} not found,please add this coin to db')
class CryptoCoinNotFoundError(NotFoundError):
# entity_name: str = 'User'
pass
main.py
from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService
deposit_Router = APIRouter()
instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
print(results)
deposit_app = FastAPI()
@deposit_app.on_event('startup')
async def app_startup():
background_runnable = BackgroundRunnable()
background_runnable.initialize(instance_manager)
asyncio.create_task(background_runnable.run_main())
# asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
deposit_app.include_router(deposit_Router)
我在后台线程中有快速 API 应用程序和 运行 计划任务作为快速 API 中的启动事件。
所以当我在路由范围内使用 SQlAlchemy 异步会话时,例如:
session: AsyncSession=Depends(instance_manger.db_instance.get_db_session)
没问题,运行 是正确的,但是当它在后台线程中为 运行 时,出现以下错误。
我使用 python 模块 => SQLAlchemy[asyncio] asyncmy pymysql fastapi
database.py
class DBManager:
def __init__(self):
self.SQLALCHEMY_DATABASE_URL = None
self.config_reader_instance = None
self.engine = None
self._session_factory = None
self.logger_handler_instance = None
self.db = None
def initialize(self, config_reader_instance, logger_handler_instance):
self.logger_handler_instance = logger_handler_instance
self.config_reader_instance = config_reader_instance
self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
self.config_reader_instance.DB_INFO['db_name'])
self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# self.engine.begi/n()
self._session_factory = async_scoped_session(sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
# self._session_factory = orm.scoped_session(
# orm.sessionmaker(
# class_=AsyncSession,
# autoflush=False,
# bind=self.engine,
# ),
# )
async def get_db_session(self) -> AsyncSession:
async with self._session_factory() as session:
try:
yield session
except Exception as e:
self.logger_handler_instance.write_log(__name__, logging.FATAL,
'Session rollback because of exception')
self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
await session.rollback()
raise
finally:
await session.close()
background_thread.py
class BackgroundRunnable:
def __init__(self):
self.instance_manger = None
self.core_process_instance = None
self.conf_reader_instance = None
self.process_id = None
self.process_name = "BTC"
def initialize(self, instance_manager: InstanceManager):
self.instance_manger = instance_manager
return self
def set_process_info(self, process_name):
self.process_id = os.getpid()
self.process_name = process_name
async def run_main(self):
self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
"Background Thread is start")
results = await CryptoCoinService(
CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
print(results)
crypto_coin_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper
from db.models.models import CryptoCoinModel
class CryptoCoinRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_all(self) -> bool:
results = await self.session.execute(
select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
results_ = results.fetchone()
if results_.__len__() == 0:
return False
else:
return True
main.py
from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService
deposit_Router = APIRouter()
instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
print(results)
deposit_app = FastAPI()
@deposit_app.on_event('startup')
async def app_startup():
background_runnable = BackgroundRunnable()
background_runnable.initialize(instance_manager)
asyncio.create_task(background_runnable.run_main())
# asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
deposit_app.include_router(deposit_Router)
当我运行快速API应用程序错误时属于输出。
INFO: Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
INFO: Started reloader process [176] using watchgod
INFO: Started server process [179]
INFO: Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
results = await CryptoCoinService(
File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO: Application startup complete.
全部注意:当您像生成器一样使用 database.py 中的函数 get_db_session 时,会话的关闭功能不会像自动一样工作,因此您应该像手动一样关闭它们。如果您需要更多详细信息,请发送电子邮件,drr000t3r@gmail.com。祝你好运。
database.py
import logging
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
Base = declarative_base()
class DBManager:
def __init__(self):
self.SQLALCHEMY_DATABASE_URL = None
self.config_reader_instance = None
self.engine = None
self.session_factory = None
self.Base = declarative_base()
self.logger_handler_instance = None
def initialize(self, config_reader_instance, logger_handler_instance):
self.logger_handler_instance = logger_handler_instance
self.config_reader_instance = config_reader_instance
self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
self.config_reader_instance.DB_INFO['db_name'])
self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, pool_size=30,
max_overflow=30, echo_pool=True, echo=False,
pool_recycle=3600) # recycle every hour
DBManager.Base = declarative_base()
self.session_factory = scoped_session(sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False
))
def get_db_session(self):
session = self.session_factory()
try:
yield session
except Exception as e:
self.logger_handler_instance.log(__name__, logging.FATAL,
'Session rollback because of exception')
self.logger_handler_instance.log(__name__, logging.FATAL, e)
session.rollback()
raise
finally:
session.close()
async def init_models(self):
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
background_thread.py
class BackgroundRunnable:
def __init__(self):
self.instance_manger = None
self.core_process_instance = None
self.conf_reader_instance = None
self.process_id = None
self.process_name = "BTC"
def initialize(self, instance_manager: InstanceManager):
self.instance_manger = instance_manager
return self
def set_process_info(self, process_name):
self.process_id = os.getpid()
self.process_name = process_name
async def run_main(self):
self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
"Background Thread is start")
self.session: AsyncSession = next(self.instance_manger.db_instance.get_db_session())
results = await CryptoCoinService(CryptoCoinRepository(self.session)).get_coin(
self.instance_manger.config_reader_instance.BTC_INFO['BTC_COIN'])
print(results)
crypto_coin_repository.py
"""Repositories module."""
from contextlib import AbstractContextManager
from typing import Callable
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper, Session
from db.models.models import CryptoCoinModel
class CryptoCoinRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_all(self, coin) -> bool:
results = await self.session.execute(
select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == coin))
results = results.fetchall()
if len(results) == 0:
return False
else:
return True
def serialize(self, model):
"""Transforms a model into a dictionary which can be dumped to JSON."""
# first we get the names of all the columns on your model
columns = [c.key for c in class_mapper(model.__class__).columns]
# then we return their values in a dict
return dict((c, getattr(model, '_' + c)) for c in columns)
class NotFoundError(Exception):
symbol: str
def __init__(self):
super().__init__(f'{self._symobl} not found,please add this coin to db')
class CryptoCoinNotFoundError(NotFoundError):
# entity_name: str = 'User'
pass
main.py
from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService
deposit_Router = APIRouter()
instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
print(results)
deposit_app = FastAPI()
@deposit_app.on_event('startup')
async def app_startup():
background_runnable = BackgroundRunnable()
background_runnable.initialize(instance_manager)
asyncio.create_task(background_runnable.run_main())
# asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
deposit_app.include_router(deposit_Router)