处理 sqlalchemy 断开连接的更好方法

Better approach to handling sqlalchemy disconnects

我们一直在试验 sqlalchemy 的断开连接处理,以及它如何与 ORM 集成。我们研究了文档,建议似乎是捕获断开连接异常,发出 rollback() 并重试代码。

例如:

import sqlalchemy as SA

retry = 2
while retry:
    retry -= 1
    try:
        for name in session.query(Names):
            print name
        break
    except SA.exc.DBAPIError as exc:
        if retry and exc.connection_invalidated:
            session.rollback()
        else:
            raise

我遵循基本原理——您必须回滚所有活动事务并重播它们以确保您的操作顺序一致。

但是——这意味着要向每个要处理数据的函数添加大量额外代码。此外,在 SELECT 的情况下,我们没有修改数据, rollback/re-request 的概念不仅难看,而且违反了 DRY 原则(不要重复自己)。

我想知道其他人是否介意分享他们如何使用 sqlalchemy 处理断开连接。

仅供参考:我们使用的是 sqlalchemy 0.9.8 和 Postgres 9.2.9

我喜欢的方法是将我所有的数据库代码放在一个 lambda 或闭包中,然后将其传递给一个辅助函数,该函数将处理捕获断开连接异常并重试。

以你的例子为例:

import sqlalchemy as SA

def main():
    def query():
        for name in session.query(Names):
            print name

    run_query(query)

def run_query(f, attempts=2):
    while attempts > 0:
        attempts -= 1
        try:
            return f() # "break" if query was successful and return any results
         except SA.exc.DBAPIError as exc:
            if attempts > 0 and exc.connection_invalidated:
                session.rollback()
            else:
                raise

您可以通过将一个布尔值传递给 run_query 来处理您只进行读取并因此希望重试而不回滚的情况。

这有助于您满足 DRY 原则,因为所有用于管理重试和回滚的丑陋样板代码都放在一个位置。

使用指数退避(https://github.com/litl/backoff):

@backoff.on_exception(
    backoff.expo,
    sqlalchemy.exc.DBAPIError,
    factor=7,
    max_tries=3,
    on_backoff=lambda details: LocalSession.get_main_sql_session().rollback(),
    on_giveup=lambda details: LocalSession.get_main_sql_session().flush(),  # flush the session
    logger=logging
)
def pessimistic_insertion(document_metadata):

    LocalSession.get_main_sql_session().add(document_metadata)
    LocalSession.get_main_sql_session().commit()

假设 LocalSession.get_main_sql_session() returns 是单身人士。