如何在没有全局变量的芹菜任务中设置sqlalchemy会话
how to setup sqlalchemy session in celery tasks with no global variable
总结:我想在 celery 任务中使用 sqlalchemy 会话,而没有包含该会话的全局变量。
我在一个带有 celery 任务的项目中使用 SQLAlchemy,我有
目前,我有一个全局变量 'session' 与我的芹菜应用程序设置 (celery.py) 一起定义,并有一个工作信号来设置它。
session = scoped_session(sessionmaker())
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
# load the application configuration
# db_uri = conf['db_uri']
engine = create_engine(db_uri)
session.configure(bind=engine)
在定义任务的模块中,我简单地导入 'session' 并使用它。任务使用自定义 class 定义,返回后关闭会话:
class DBTask(Task):
def after_return(self, *args, **kwargs):
session.remove()
但是效果很好:当使用 CELERY_ALWAYS_EAGER=True 进行单元测试时,不会配置会话。到目前为止我发现的唯一解决方案是在单元测试中 运行 任务时模拟 'session' 变量:
with mock.patch('celerymodule.tasks.session', self.session):
do_something.delay(...)
虽然它有效,但我不想那样做。
有没有什么方法可以设置一个会话,它不会是一个全局变量,它既适用于正常的异步行为,也适用于 CELERY_ALWAYS_EAGER=True 的工作人员?
关于 custom task classes 的官方文档中的答案就在我眼皮底下。
我修改了用于访问数据库的任务的自定义任务 class:
class DBTask(Task):
_session = None
def after_return(self, *args, **kwargs):
if self._session is not None:
self._session.remove()
@property
def session(self):
if self._session is None:
_, self._session = _get_engine_session(self.conf['db_uri'],
verbose=False)
return self._session
我这样定义我的任务:
@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
self.conf = conf
thing = self.session.query(Thing).filter_by(arg=some_arg).first()
这样,SQLAlchemy 会话只会为每个 celery 工作进程创建一次,我不需要任何全局变量。
这解决了我的单元测试问题,因为 SQLAlchemy 会话设置现在独立于 celery workers。
总结:我想在 celery 任务中使用 sqlalchemy 会话,而没有包含该会话的全局变量。
我在一个带有 celery 任务的项目中使用 SQLAlchemy,我有
目前,我有一个全局变量 'session' 与我的芹菜应用程序设置 (celery.py) 一起定义,并有一个工作信号来设置它。
session = scoped_session(sessionmaker())
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
# load the application configuration
# db_uri = conf['db_uri']
engine = create_engine(db_uri)
session.configure(bind=engine)
在定义任务的模块中,我简单地导入 'session' 并使用它。任务使用自定义 class 定义,返回后关闭会话:
class DBTask(Task):
def after_return(self, *args, **kwargs):
session.remove()
但是效果很好:当使用 CELERY_ALWAYS_EAGER=True 进行单元测试时,不会配置会话。到目前为止我发现的唯一解决方案是在单元测试中 运行 任务时模拟 'session' 变量:
with mock.patch('celerymodule.tasks.session', self.session):
do_something.delay(...)
虽然它有效,但我不想那样做。
有没有什么方法可以设置一个会话,它不会是一个全局变量,它既适用于正常的异步行为,也适用于 CELERY_ALWAYS_EAGER=True 的工作人员?
关于 custom task classes 的官方文档中的答案就在我眼皮底下。
我修改了用于访问数据库的任务的自定义任务 class:
class DBTask(Task):
_session = None
def after_return(self, *args, **kwargs):
if self._session is not None:
self._session.remove()
@property
def session(self):
if self._session is None:
_, self._session = _get_engine_session(self.conf['db_uri'],
verbose=False)
return self._session
我这样定义我的任务:
@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
self.conf = conf
thing = self.session.query(Thing).filter_by(arg=some_arg).first()
这样,SQLAlchemy 会话只会为每个 celery 工作进程创建一次,我不需要任何全局变量。
这解决了我的单元测试问题,因为 SQLAlchemy 会话设置现在独立于 celery workers。