如何在没有全局变量的芹菜任务中设置sqlalchemy会话

how to setup sqlalchemy session in celery tasks with no global variable

总结:我想在 celery 任务中使用 sqlalchemy 会话,而没有包含该会话的全局变量。

我在一个带有 celery 任务的项目中使用 SQLAlchemy,我有

目前,我有一个全局变量 'session' 与我的芹菜应用程序设置 (celery.py) 一起定义,并有一个工作信号来设置它。

session = scoped_session(sessionmaker())

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    # load the application configuration
    # db_uri = conf['db_uri']
    engine = create_engine(db_uri)
    session.configure(bind=engine)

在定义任务的模块中,我简单地导入 'session' 并使用它。任务使用自定义 class 定义,返回后关闭会话:

class DBTask(Task):
    def after_return(self, *args, **kwargs):
        session.remove()

但是效果很好:当使用 CELERY_ALWAYS_EAGER=True 进行单元测试时,不会配置会话。到目前为止我发现的唯一解决方案是在单元测试中 运行 任务时模拟 'session' 变量:

with mock.patch('celerymodule.tasks.session', self.session):
    do_something.delay(...)

虽然它有效,但我不想那样做。

有没有什么方法可以设置一个会话,它不会是一个全局变量,它既适用于正常的异步行为,也适用于 CELERY_ALWAYS_EAGER=True 的工作人员?

关于 custom task classes 的官方文档中的答案就在我眼皮底下。

我修改了用于访问数据库的任务的自定义任务 class:

class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = _get_engine_session(self.conf['db_uri'],
                                                   verbose=False)

        return self._session

我这样定义我的任务:

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()

这样,SQLAlchemy 会话只会为每个 celery 工作进程创建一次,我不需要任何全局变量。

这解决了我的单元测试问题,因为 SQLAlchemy 会话设置现在独立于 celery workers。