如何将 sqlalchemy 异步会话(Async Session)生成器设为 class-base?

how to make the sqlalchemy async session(Async Session) generator as class-base?

我在后台线程中有快速 API 应用程序和 运行 计划任务作为快速 API 中的启动事件。 所以当我在路由范围内使用 SQlAlchemy 异步会话时,例如: session: AsyncSession=Depends(instance_manger.db_instance.get_db_session) 没问题,运行 是正确的,但是当它在后台线程中为 运行 时,出现以下错误。 我使用 python 模块 => SQLAlchemy[asyncio] asyncmy pymysql fastapi

database.py

class DBManager:
    def __init__(self):
        self.SQLALCHEMY_DATABASE_URL = None
        self.config_reader_instance = None
        self.engine = None
        self._session_factory = None
        self.logger_handler_instance = None
        self.db = None

    def initialize(self, config_reader_instance, logger_handler_instance):
        self.logger_handler_instance = logger_handler_instance
        self.config_reader_instance = config_reader_instance
        self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
            self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
            self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
            self.config_reader_instance.DB_INFO['db_name'])
        self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
        # self.engine.begi/n()
        self._session_factory = async_scoped_session(sessionmaker(
            self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
        # self._session_factory = orm.scoped_session(
        #     orm.sessionmaker(
        #         class_=AsyncSession,
        #         autoflush=False,
        #         bind=self.engine,
        #     ),
        # )

    async def get_db_session(self) -> AsyncSession:
        async with self._session_factory() as session:
            try:
                yield session
            except Exception as e:
                self.logger_handler_instance.write_log(__name__, logging.FATAL,
                                                       'Session rollback because of exception')
                self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
                await session.rollback()
                raise
            finally:
                await session.close()

background_thread.py

class BackgroundRunnable:
    def __init__(self):
        self.instance_manger = None
        self.core_process_instance = None
        self.conf_reader_instance = None
        self.process_id = None
        self.process_name = "BTC"

    def initialize(self, instance_manager: InstanceManager):
        self.instance_manger = instance_manager
        return self

    def set_process_info(self, process_name):
        self.process_id = os.getpid()
        self.process_name = process_name

    async def run_main(self):
        self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
                                                               "Background Thread is start")
        results = await CryptoCoinService(
            CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
        print(results)

crypto_coin_repository.py

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper

from db.models.models import CryptoCoinModel


class CryptoCoinRepository:
    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def get_all(self) -> bool:
        results = await self.session.execute(
            select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
        results_ = results.fetchone()
        if results_.__len__() == 0:
            return False
        else:
            return True

main.py

from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession

from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService

deposit_Router = APIRouter()

instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)


@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
    results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
    print(results)

deposit_app = FastAPI()

@deposit_app.on_event('startup')
async def app_startup():
    background_runnable = BackgroundRunnable()
    background_runnable.initialize(instance_manager)
    asyncio.create_task(background_runnable.run_main())
    # asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())

deposit_app.include_router(deposit_Router)

当我运行快速API应用程序错误时属于输出。

INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
INFO:     Started reloader process [176] using watchgod
INFO:     Started server process [179]
INFO:     Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
    results = await CryptoCoinService(
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
    results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO:     Application startup complete.

全部注意:当您像生成器一样使用 database.py 中的函数 get_db_session 时,会话的关闭功能不会像自动一样工作,因此您应该像手动一样关闭它们。如果您需要更多详细信息,请发送电子邮件,drr000t3r@gmail.com。祝你好运。

database.py

import logging

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

Base = declarative_base()


class DBManager:
    def __init__(self):
        self.SQLALCHEMY_DATABASE_URL = None
        self.config_reader_instance = None
        self.engine = None
        self.session_factory = None
        self.Base = declarative_base()
        self.logger_handler_instance = None

    def initialize(self, config_reader_instance, logger_handler_instance):
        self.logger_handler_instance = logger_handler_instance
        self.config_reader_instance = config_reader_instance
        self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
            self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
            self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
            self.config_reader_instance.DB_INFO['db_name'])
        self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, pool_size=30,
                                          max_overflow=30, echo_pool=True, echo=False,
                                          pool_recycle=3600)  # recycle every hour
        DBManager.Base = declarative_base()
        self.session_factory = scoped_session(sessionmaker(
            self.engine, class_=AsyncSession, expire_on_commit=False
        ))

    def get_db_session(self):
        session = self.session_factory()
        try:
            yield session
        except Exception as e:
            self.logger_handler_instance.log(__name__, logging.FATAL,
                                             'Session rollback because of exception')
            self.logger_handler_instance.log(__name__, logging.FATAL, e)
            session.rollback()
            raise
        finally:
            session.close()

    async def init_models(self):
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)

background_thread.py

class BackgroundRunnable:
    def __init__(self):
        self.instance_manger = None
        self.core_process_instance = None
        self.conf_reader_instance = None
        self.process_id = None
        self.process_name = "BTC"

    def initialize(self, instance_manager: InstanceManager):
        self.instance_manger = instance_manager
        return self

    def set_process_info(self, process_name):
        self.process_id = os.getpid()
        self.process_name = process_name

    async def run_main(self):
        self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
                                                               "Background Thread is start")
        self.session: AsyncSession = next(self.instance_manger.db_instance.get_db_session())
        results = await CryptoCoinService(CryptoCoinRepository(self.session)).get_coin(
            self.instance_manger.config_reader_instance.BTC_INFO['BTC_COIN'])
        print(results)

crypto_coin_repository.py

"""Repositories module."""
from contextlib import AbstractContextManager
from typing import Callable

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper, Session

from db.models.models import CryptoCoinModel


class CryptoCoinRepository:
    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def get_all(self, coin) -> bool:
        results = await self.session.execute(
            select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == coin))
        results = results.fetchall()
        if len(results) == 0:
            return False
        else:
            return True

    def serialize(self, model):
        """Transforms a model into a dictionary which can be dumped to JSON."""
        # first we get the names of all the columns on your model
        columns = [c.key for c in class_mapper(model.__class__).columns]
        # then we return their values in a dict
        return dict((c, getattr(model, '_' + c)) for c in columns)


class NotFoundError(Exception):
    symbol: str

    def __init__(self):
        super().__init__(f'{self._symobl} not found,please add this coin to db')


class CryptoCoinNotFoundError(NotFoundError):
    # entity_name: str = 'User'
    pass

main.py

from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession

from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService

deposit_Router = APIRouter()

instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)


@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
    results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
    print(results)

deposit_app = FastAPI()

@deposit_app.on_event('startup')
async def app_startup():
    background_runnable = BackgroundRunnable()
    background_runnable.initialize(instance_manager)
    asyncio.create_task(background_runnable.run_main())
    # asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())

deposit_app.include_router(deposit_Router)