如何教 SQLAlchemy 从断开连接中恢复?

How can SQLAlchemy be taught to recover from a disconnect?

根据 http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html#disconnect-handling-pessimistic,如果连接池中的条目不再有效,则可以检测 SQLAlchemy 以重新连接。我创建了以下测试用例来对此进行测试:

import subprocess
from sqlalchemy import create_engine, event
from sqlalchemy import exc
from sqlalchemy.pool import Pool

@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
    cursor = dbapi_connection.cursor()
    try:
        print "pinging server"
        cursor.execute("SELECT 1")
    except:
        print "raising disconnect error"
        raise exc.DisconnectionError()
    cursor.close()

engine = create_engine('postgresql://postgres@localhost/test')

connection = engine.connect()

subprocess.check_call(['psql', str(engine.url), '-c',
    "select pg_terminate_backend(pid) from pg_stat_activity " +
    "where pid <> pg_backend_pid() " +
    "and datname='%s';" % engine.url.database],
    stdout=subprocess.PIPE)

result = connection.execute("select 'OK'")
for row in result:
    print "Success!", " ".join(row)

但我没有恢复,而是收到此异常:

sqlalchemy.exc.OperationalError: (OperationalError) terminating connection due to administrator command
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

由于 "pinging server" 打印在终端上,因此可以安全地得出事件侦听器已附加的结论。如何教 SQLAlchemy 从断开连接中恢复?

看起来 checkout 方法 仅在您第一次从池中获得连接时调用 (例如您的 connection = engine.connect() 行)

如果您随后失去连接,则必须明确更换它,这样您就可以获取一个新的,然后重试 sql:

try:
    result = connection.execute("select 'OK'")
except sqlalchemy.exc.OperationalError:  # may need more exceptions here
    connection = engine.connect()  # grab a new connection
    result = connection.execute("select 'OK'")  # and retry

围绕 sql 的每一点做这将是一件痛苦的事情,因此您可以使用类似的方式包装数据库查询:

def db_execute(conn, query):
    try:
        result = conn.execute(query)
    except sqlalchemy.exc.OperationalError:  # may need more exceptions here (or trap all)
        conn = engine.connect()  # replace your connection
        result = conn.execute(query)  # and retry
    return result

以下:

result = db_execute(connection, "select 'OK'")

现在应该成功了。

另一种选择是同时监听 invalidate 方法,然后采取一些措施来替换您的连接。