FreeTDS 和 PyODBC 随机连接与服务器断开连接

FreeTDS and PyODBC Random Connection Disconnects from server

出于某种原因,以下堆栈跟踪在启动该过程后大约 16 小时或更长时间后首次出现,然后每隔一两个小时出现一次。我在其他服务器上每天、每小时和每隔几分钟都有其他工作,但他们没有得到这个堆栈跟踪。我能看到的唯一区别是此代码每个调度程序有多个作业,而其他代码则没有。

堆栈跟踪:

Traceback (most recent call last):
  File "app.py", line 24, in _run_client
    h.run_hourly(start, end)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/helper.py", line 153, in run_hourly
    _run(_HOURLY_REQUEST, frequency, start, end, duration)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/helper.py", line 117, in _run
    bars = dr.get_bars(frequency)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/data_requests.py", line 35, in get_bars
    df = pd.read_sql(query, _ENGINE, params=params)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 415, in read_sql
chunksize=chunksize)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 1084, in read_query
    result = self.execute(*args)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 975, in execute
    return self.connectable.execute(*args, **kwargs)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 2055, in execute
    return connection.execute(statement, *multiparams, **params)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
compiled_sql, distilled_params
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
context)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1386, in _handle_dbapi_exception
    self._autorollback()
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 824, in _autorollback
    self._root._rollback_impl()
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 703, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1315, in _handle_dbapi_exception
    exc_info
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 701, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 439, in do_rollback
    dbapi_connection.rollback()
DBAPIError: (pyodbc.Error) ('08S01', '[08S01] [FreeTDS][SQL Server]Write to the server failed (20006) (SQLEndTran)')

服务器与下述版本匹配。

版本:
OS: Ubuntu 16.04.1
FreeTDS 包版本:freetds-common:0.91-6.1build1, tdsodbc:amd64:0.91-6.1build1 PyODBC:3.0.10
SQL炼金术:1.1.4

TSQL 输出:

Compile-time settings (established with the "configure" script)
                        Version: freetds v0.91
         freetds.conf directory: /etc/freetds
 MS db-lib source compatibility: no
    Sybase binary compatibility: yes
                  Thread safety: yes
                  iconv library: yes
                    TDS version: 4.2
                          iODBC: no
                       unixodbc: yes
          SSPI "trusted" logins: no
                       Kerberos: yes

测试代码:

app.py

from datetime import datetime, timedelta
from dateutil import relativedelta
import traceback

from apscheduler.schedulers.blocking import BlockingScheduler
from bar import helper as h


def _run_client(resolution):
    try:
        if resolution == "hourly":
            # Had to create a temporary variable to make native datetimes
            t = datetime.utcnow() - timedelta(hours=1)
            end = datetime(t.year, t.month, t.day, t.hour)
            start = end - timedelta(hours=1)
            h.run_hourly(start, end)
        elif resolution == "daily":
            # Had to create a temporary variable to make native datetimes
            t = datetime.utcnow().date() - timedelta(days=1)
            end = datetime(t.year, t.month, t.day)
            start = end - timedelta(days=1)
            h.run_daily(start, end)
        else:
            # Had to create a temporary variable to make native datetimes
            t = datetime.utcnow().date().replace(
                day=1) - relativedelta.relativedelta(months=1)
            end = datetime(t.year, t.month, t.day)
            start = end - relativedelta.relativedelta(months=1)
            h.run_monthly(start, end)
    except:
        print "Current run failed:\n%s" % traceback.format_exc()


def _get_hourly_job(sched):
    args = ["hourly"]
    job = sched.add_job(_run_client, args=args, trigger="cron", hour="*", minute="0")
    return job


def _get_daily_job(sched):
    args = ["daily"]
    job = sched.add_job(_run_client, args=args, trigger="cron", hour="4", minute="0")
    return job


def _get_monthly_job(sched):
    args = ["monthly"]
    job = sched.add_job(_run_client, args=args, trigger="cron", day="1", hour="0", minute="0")
    return job

if __name__ == "__main__":
    sched = BlockingScheduler()
    hourly_job = _get_hourly_job(sched)
    daily_job = _get_daily_job(sched)
    monthly_job = _get_monthly_job(sched)

    try:
        sched.start()
    except:
        # Remove the jobs from memory since they finished
        hourly_job.remove()
        daily_job.remove()
        monthly_job.remove()

        sched.shutdown()

helper.py

from datetime import timedelta

import data_requests as dr

_HOURLY_REQUEST = Foo()
_HOURLY_REQUEST.resolution = "hourly"

_DAILY_REQUEST = Foo()
_DAILY_REQUEST.resolution = "daily"

_MONTHLY_REQUEST = Foo()
_MONTHLY_REQUEST.resolution = "monthly"


def _run(request, frequency, start, end, duration):
    bars = dr.get_bars(frequency)

    if bars.empty: 
        return None
    print "Bars = %i" % len(bars)


def run_daily(start, end):
    frequency = 86400
    duration = timedelta(hours=4)
    _run(_DAILY_REQUEST, frequency, start, end, duration)


def run_hourly(start, end):
    frequency = 3600
    duration = timedelta(minutes=30)
    _run(_HOURLY_REQUEST, frequency, start, end, duration)


def run_monthly(start, end):
    frequency = 1209600
    duration = timedelta(days=1)
    _run(_MONTHLY_REQUEST, frequency, start, end, duration)

data_requests.py

import pandas as pd
from sqlalchemy import create_engine, exc
from sqlalchemy.sql import text

_DB = "mssql+pyodbc://foo@stg-foo:FooBar@stg-foo.database.secure.windows.net:1433/foo?driver=/usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so&tds_version=7.2"
_ENGINE = create_engine(_DB)


def get_bars(frequency):
    query = text("""SELECT h.foo_id, s.id AS bar_id, u.timezone
                FROM foo h
                INNER JOIN bar s ON h.foo_id = s.foo_id
                AND s.frequency <= :frequency
                INNER JOIN test u ON u.id = h.test_id""")
    params = {
        "frequency": int(frequency)
    }

    try:
        df = pd.read_sql(query, _ENGINE, params=params)
    except exc.DBAPIError, e:
        # If connection is invalid (e.g. database restarted) execute the query
        # again
        if e.connection_invalidated:
            df = pd.read_sql(query, _ENGINE, params=params)
        else:
            raise e

    return df

我试过 pymssql,但是最新版本(2.1.3,freetds 版本 1.00.9)出现段错误,而 pyodbc 没有。我认为这是一个驱动程序问题,但我在尝试解决它时遇到了问题。

此外,我遵循了 SQL Alchemy 关于如何处理连接断开的文档,但查看堆栈跟踪,它似乎并没有触及逻辑。相反,它会遇到 else 块并引发错误。

我找到了错误的原因。毕竟这似乎是一个编码问题。花了很长时间才弄清楚,但这与我如何用 pandas 进行调用有关。显然, pandas 在传递引擎实例时似乎没有关闭连接。这是我用来解决问题的代码。

def _get_df(query, params=None):
    try:
        with _ENGINE.begin() as conn:
            df = pd.read_sql(query, conn, params=params)
    except exc.DBAPIError, e:
        # If connection is invalid (e.g. database restarted) execute the query
        # again
        if e.connection_invalidated:
            with _ENGINE.begin() as conn:
                df = pd.read_sql(query, conn, params=params)
        else:
            raise e
    return df