如何在金字塔网络应用程序中手动提交 sqlalchemy 数据库事务?
How do I manually commit a sqlalchemy database transaction inside a pyramid web app?
我有一个 Pyramid 网络应用程序,在对 sqlalchemy 数据库进行更改后需要 运行 一个 Celery 任务。我知道我可以使用 request.tm.get().addAfterCommitHook() 来做到这一点。但是,这对我不起作用,因为我还需要在视图中使用 celery 任务的 task_id 。因此,在我对 Celery 任务调用 task.delay() 之前,我需要提交对数据库的更改。
zope.sqlalchemy 文档说我可以使用 transaction.commit() 手动提交。但是,这对我不起作用;芹菜任务 运行s 在更改提交到数据库之前,即使我在调用 task.delay()
之前调用了 transaction.commit()
我的金字塔视图代码如下所示:
ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)
# Flush dbsession so ride gets an id assignment
dbsession.flush()
# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))
# Commit ride to database
import transaction
transaction.commit()
# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)
url = self.request.route_url('rides')
return HTTPFound(
url,
content_type='application/json',
charset='',
text=json.dumps(
{'ride_id':ride_id,
'update_weather_task_id':update_weather_task.task_id}))
我的 celery 任务是这样的:
@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id, train_model=True):
from ..celery import session_factory
logger.debug('Received update weather task for ride {}'.format(ride_id))
dbsession=session_factory()
dbsession.expire_on_commit=False
with transaction.manager:
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
芹菜任务失败,NoResultFound:
File "/app/cycling_data/processing/weather.py", line 478, in update_ride_weather
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3282, in one
raise orm_exc.NoResultFound("No row was found for one()")
事后检查数据库时,我发现记录实际上是在 celery 任务 运行 之后创建的,但失败了。所以这意味着 transaction.commit() 没有按预期提交 t运行saction,而是在视图返回后由 zope.sqlalchemy 机器自动提交更改。如何在我的视图代码中手动提交 t运行saction?
request.tm
由 pyramid_tm
定义,可以是线程本地 transaction.manager
对象或每个请求对象,具体取决于您如何配置 pyramid_tm
(看因为 pyramid_tm.manager_hook
在某处定义以确定正在使用哪个。
你的问题很棘手,因为无论你做什么都应该符合 pyramid_tm
以及它期望事情如何运作。具体来说,它计划围绕请求的生命周期来控制事务——尽早提交对于该事务来说不是一个好主意。 pyramid_tm
正在尝试帮助提供故障安全功能,以便在请求生命周期的任何地方发生任何故障时回滚整个请求 - 而不仅仅是在您的可调用视图中。
选项 1:
无论如何都要早点承诺。如果您要这样做,那么提交后的失败将无法回滚已提交的数据,因此您可以让请求部分提交。好吧,那是你的问题,所以答案是使用 request.tm.commit()
可能后跟 request.tm.begin()
来为任何后续更改启动一个新的。您还需要注意不要跨该边界共享 sqlalchemy 管理的对象,例如 request.user
等,因为它们需要 refreshed/merged 到新事务中(SQLAlchemy 的身份缓存不能信任从默认情况下不同的事务,因为这就是隔离级别的工作方式。
选项 2:
为您要提前提交的数据启动一个单独的事务。好吧,假设您没有使用 transaction.manager
或 scoped_session
之类的任何线程局部变量,那么您可以开始自己的事务并提交它,而无需触及 dbsession
由 pyramid_tm
。一些适用于 pyramid-cookiecutter-starter 项目结构的通用代码可能是:
from myapp.models import get_tm_session
tmp_tm = transaction.TransactionManager(explicit=True)
with tmp_tm:
dbsession_factory = request.registry['dbsession_factory']
tmp_dbsession = get_tm_session(dbsession_factory, tmp_tm)
# ... do stuff with tmp_dbsession that is committed in this with-statement
ride = appstruct_to_ride(tmp_dbsession, appstruct)
# do not use this ride object outside of the with-statement
tmp_dbsession.add(ride)
tmp_dbsession.flush()
ride_id = ride.id
# we are now committed so go ahead and start your background worker
update_weather_task = update_ride_weather.delay(ride_id)
# maybe you want the ride object outside of the tmp_dbsession
ride = dbsession.query(Ride).filter(Ride.id==ride_id).one()
return {...}
这还不错——就故障模式而言,这可能是您在不将 celery 挂接到 pyramid_tm-controlled dbsession 的情况下所能做的最好的事情。
我有一个 Pyramid 网络应用程序,在对 sqlalchemy 数据库进行更改后需要 运行 一个 Celery 任务。我知道我可以使用 request.tm.get().addAfterCommitHook() 来做到这一点。但是,这对我不起作用,因为我还需要在视图中使用 celery 任务的 task_id 。因此,在我对 Celery 任务调用 task.delay() 之前,我需要提交对数据库的更改。
zope.sqlalchemy 文档说我可以使用 transaction.commit() 手动提交。但是,这对我不起作用;芹菜任务 运行s 在更改提交到数据库之前,即使我在调用 task.delay()
之前调用了 transaction.commit()我的金字塔视图代码如下所示:
ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)
# Flush dbsession so ride gets an id assignment
dbsession.flush()
# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))
# Commit ride to database
import transaction
transaction.commit()
# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)
url = self.request.route_url('rides')
return HTTPFound(
url,
content_type='application/json',
charset='',
text=json.dumps(
{'ride_id':ride_id,
'update_weather_task_id':update_weather_task.task_id}))
我的 celery 任务是这样的:
@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id, train_model=True):
from ..celery import session_factory
logger.debug('Received update weather task for ride {}'.format(ride_id))
dbsession=session_factory()
dbsession.expire_on_commit=False
with transaction.manager:
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
芹菜任务失败,NoResultFound:
File "/app/cycling_data/processing/weather.py", line 478, in update_ride_weather
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3282, in one
raise orm_exc.NoResultFound("No row was found for one()")
事后检查数据库时,我发现记录实际上是在 celery 任务 运行 之后创建的,但失败了。所以这意味着 transaction.commit() 没有按预期提交 t运行saction,而是在视图返回后由 zope.sqlalchemy 机器自动提交更改。如何在我的视图代码中手动提交 t运行saction?
request.tm
由 pyramid_tm
定义,可以是线程本地 transaction.manager
对象或每个请求对象,具体取决于您如何配置 pyramid_tm
(看因为 pyramid_tm.manager_hook
在某处定义以确定正在使用哪个。
你的问题很棘手,因为无论你做什么都应该符合 pyramid_tm
以及它期望事情如何运作。具体来说,它计划围绕请求的生命周期来控制事务——尽早提交对于该事务来说不是一个好主意。 pyramid_tm
正在尝试帮助提供故障安全功能,以便在请求生命周期的任何地方发生任何故障时回滚整个请求 - 而不仅仅是在您的可调用视图中。
选项 1:
无论如何都要早点承诺。如果您要这样做,那么提交后的失败将无法回滚已提交的数据,因此您可以让请求部分提交。好吧,那是你的问题,所以答案是使用 request.tm.commit()
可能后跟 request.tm.begin()
来为任何后续更改启动一个新的。您还需要注意不要跨该边界共享 sqlalchemy 管理的对象,例如 request.user
等,因为它们需要 refreshed/merged 到新事务中(SQLAlchemy 的身份缓存不能信任从默认情况下不同的事务,因为这就是隔离级别的工作方式。
选项 2:
为您要提前提交的数据启动一个单独的事务。好吧,假设您没有使用 transaction.manager
或 scoped_session
之类的任何线程局部变量,那么您可以开始自己的事务并提交它,而无需触及 dbsession
由 pyramid_tm
。一些适用于 pyramid-cookiecutter-starter 项目结构的通用代码可能是:
from myapp.models import get_tm_session
tmp_tm = transaction.TransactionManager(explicit=True)
with tmp_tm:
dbsession_factory = request.registry['dbsession_factory']
tmp_dbsession = get_tm_session(dbsession_factory, tmp_tm)
# ... do stuff with tmp_dbsession that is committed in this with-statement
ride = appstruct_to_ride(tmp_dbsession, appstruct)
# do not use this ride object outside of the with-statement
tmp_dbsession.add(ride)
tmp_dbsession.flush()
ride_id = ride.id
# we are now committed so go ahead and start your background worker
update_weather_task = update_ride_weather.delay(ride_id)
# maybe you want the ride object outside of the tmp_dbsession
ride = dbsession.query(Ride).filter(Ride.id==ride_id).one()
return {...}
这还不错——就故障模式而言,这可能是您在不将 celery 挂接到 pyramid_tm-controlled dbsession 的情况下所能做的最好的事情。