为什么 Twisted 的 adbapi 无法从单元测试中恢复数据?

Why is Twisted's adbapi failing to recover data from within unittests?

概览

上下文

我正在为一些依赖于写入 SQLite3 数据库的高阶逻辑编写单元测试。为此,我使用 twisted.trial.unittesttwisted.enterprise.adbapi.ConnectionPool.

问题陈述

我能够创建持久性 sqlite3 数据库并在其中存储数据。使用 sqlitebrowser,我能够验证数据是否已按预期保留。

问题是调用 t.e.a.ConnectionPool.run*(例如:runQuery)return 一组空结果,但仅当从 TestCase 中调用时才会如此。

注释和重要细节

我遇到的问题只发生在 Twisted 的 trial 框架内。我的第一次调试尝试是将数据库代码从单元测试中拉出来,并将其放入一个独立的 test/debug 脚本中。所述脚本按预期工作,而单元测试代码不工作(参见下面的示例)。

案例 1:单元测试行为不当

init.sql

这是用于初始化数据库的脚本。此文件没有(明显的)错误。

CREATE TABLE ajxp_changes ( seq INTEGER PRIMARY KEY AUTOINCREMENT, node_id NUMERIC, type TEXT, source TEXT, target TEXT, deleted_md5 TEXT );
CREATE TABLE ajxp_index ( node_id INTEGER PRIMARY KEY AUTOINCREMENT, node_path TEXT, bytesize NUMERIC, md5 TEXT, mtime NUMERIC, stat_result BLOB);
CREATE TABLE ajxp_last_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT, location TEXT, source TEXT, target TEXT );
CREATE TABLE ajxp_node_status ("node_id" INTEGER PRIMARY KEY  NOT NULL , "status" TEXT NOT NULL  DEFAULT 'NEW', "detail" TEXT);
CREATE TABLE events (id INTEGER PRIMARY KEY AUTOINCREMENT, type text, message text, source text, target text, action text, status text, date text);

CREATE TRIGGER LOG_DELETE AFTER DELETE ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type,deleted_md5) VALUES (old.node_id, old.node_path, "NULL", "delete", old.md5); END;
CREATE TRIGGER LOG_INSERT AFTER INSERT ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type) VALUES (new.node_id, "NULL", new.node_path, "create"); END;
CREATE TRIGGER LOG_UPDATE_CONTENT AFTER UPDATE ON "ajxp_index" FOR EACH ROW BEGIN INSERT INTO "ajxp_changes" (node_id,source,target,type) VALUES (new.node_id, old.node_path, new.node_path, CASE WHEN old.node_path = new.node_path THEN "content" ELSE "path" END);END;
CREATE TRIGGER STATUS_DELETE AFTER DELETE ON "ajxp_index" BEGIN DELETE FROM ajxp_node_status WHERE node_id=old.node_id; END;
CREATE TRIGGER STATUS_INSERT AFTER INSERT ON "ajxp_index" BEGIN INSERT INTO ajxp_node_status (node_id) VALUES (new.node_id); END;

CREATE INDEX changes_node_id ON ajxp_changes( node_id );
CREATE INDEX changes_type ON ajxp_changes( type );
CREATE INDEX changes_node_source ON ajxp_changes( source );
CREATE INDEX index_node_id ON ajxp_index( node_id );
CREATE INDEX index_node_path ON ajxp_index( node_path );
CREATE INDEX index_bytesize ON ajxp_index( bytesize );
CREATE INDEX index_md5 ON ajxp_index( md5 );
CREATE INDEX node_status_status ON ajxp_node_status( status );

test_sqlite.py

这是意外失败的单元测试 class。 TestStateManagement.test_db_clean 通过,表示表已正确创建。 TestStateManagement.test_inode_create 失败,报告检索到零个结果。

import os.path as osp

from twisted.internet import defer
from twisted.enterprise import adbapi

import sqlengine # see below

class TestStateManagement(TestCase):

    def setUp(self):
        self.meta = mkdtemp()

        self.db = adbapi.ConnectionPool(
            "sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
        )
        self.stateman = sqlengine.StateManager(self.db)

        with open("init.sql") as f:
            script = f.read()

        self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)

    def tearDown(self):
        self.db.close()
        del self.db
        del self.stateman
        del self.d

        rmtree(self.meta)

    @defer.inlineCallbacks
    def test_db_clean(self):
        """Canary test to ensure that the db is initialized in a blank state"""

        yield self.d  # wait for db to be initialized

        q = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
        for table in ("ajxp_index", "ajxp_changes"):
            res = yield self.db.runQuery(q, (table,))
            self.assertTrue(
                len(res) == 1,
                "table {0} does not exist".format(table)
         )

    @defer.inlineCallbacks
    def test_inode_create_file(self):
        yield self.d

        path = osp.join(self.ws, "test.txt")
        with open(path, "wt") as f:
            pass

        inode = mk_dummy_inode(path)
        yield self.stateman.create(inode, directory=False)

        entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
        emsg = "got {0} results, expected 1.  Are canary tests failing?"
        lentry = len(entry)
        self.assertTrue(lentry == 1, emsg.format(lentry))

sqlengine.py

这些是上述单元测试正在测试的工件。

def values_as_tuple(d, *param):
    """Return the values for each key in `param` as a tuple"""
    return tuple(map(d.get, param))


class StateManager:
    """Manages the SQLite database's state, ensuring that it reflects the state
    of the filesystem.
    """

    log = Logger()

    def __init__(self, db):
        self._db = db

    def create(self, inode, directory=False):
        params = values_as_tuple(
            inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
        )

        directive = (
            "INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
            "VALUES (?,?,?,?,?);"
        )

        return self._db.runOperation(directive, params)

案例 2:错误在 twisted.trial

之外消失
#! /usr/bin/env python

import os.path as osp
from tempfile import mkdtemp

from twisted.enterprise import adbapi
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks

INIT_FILE = "example.sql"


def values_as_tuple(d, *param):
    """Return the values for each key in `param` as a tuple"""
    return tuple(map(d.get, param))


def create(db, inode):
    params = values_as_tuple(
        inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
    )

    directive = (
        "INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
        "VALUES (?,?,?,?,?);"
    )

    return db.runOperation(directive, params)


def init_database(db):
    with open(INIT_FILE) as f:
        script = f.read()

    return db.runInteraction(lambda c, s: c.executescript(s), script)


@react
@inlineCallbacks
def main(reactor):
    meta = mkdtemp()
    db = adbapi.ConnectionPool(
        "sqlite3", osp.join(meta, "db.sqlite"), check_same_thread=False,
    )

    yield init_database(db)

    # Let's make sure the tables were created as expected and that we're
    # starting from a blank slate
    res = yield db.runQuery("SELECT * FROM ajxp_index LIMIT 1")
    assert not res, "database is not empty [ajxp_index]"

    res = yield db.runQuery("SELECT * FROM ajxp_changes LIMIT 1")
    assert not res, "database is not empty [ajxp_changes]"

    # The details of this are not important.  Suffice to say they (should)
    # conform to the DB schema for ajxp_index.
    test_data = {
        "node_path": "/this/is/some/arbitrary/path.ext",
        "bytesize": 0,
        "mtime": 179273.0,
        "stat_result": b"this simulates a blob of raw binary data",
        "md5": "d41d8cd98f00b204e9800998ecf8427e",  # arbitrary
    }

    # store the test data in the ajxp_index table
    yield create(db, test_data)

    # test if the entry exists in the db
    entry = yield db.runQuery("SELECT * FROM ajxp_index")
    assert len(entry) == 1, "got {0} results, expected 1".format(len(entry))

    print("OK")

结束语

同样,在使用 sqlitebrowser 检查时,数据似乎正在写入 db.sqlite,因此这看起来像是一个 检索 问题。从这里开始,我有点难过……有什么想法吗?

编辑

此代码将生成可用于测试的 inode

def mk_dummy_inode(path, isdir=False):
return {
    "node_path": path,
    "bytesize": osp.getsize(path),
    "mtime": osp.getmtime(path),
    "stat_result": dumps(stat(path), protocol=4),
    "md5": "directory" if isdir else "d41d8cd98f00b204e9800998ecf8427e",
}

如果您查看 setUp 函数,您将返回 self.db.runInteraction(...),其中 returns 已延迟。正如您所指出的,您假设它等待延迟完成。然而,情况并非如此,这是大多数人(包括我自己)的陷阱。老实说,对于这种情况,尤其是单元测试,我只是在 TestCase class 之外执行同步代码来初始化数据库。例如:

def init_db():
    import sqlite3
    conn = sqlite3.connect('db.sqlite')
    c = conn.cursor()
    with open("init.sql") as f:
        c.executescript(f.read())

init_db()     # call outside test case


class TestStateManagement(TestCase):
    """
    My test cases
    """

或者,您可以装饰设置 yield runOperation(...) 但有些事情告诉我它不起作用...无论如何,令人惊讶的是没有出现任何错误。

PS

我关注这个问题已经有一段时间了,现在它已经在我脑海中萦绕了好几天了。将近凌晨 1 点,我终于明白了一个潜在的原因。但是,我太 tired/lazy 无法实际测试它 :D 但这是一个非常好的预感。我想赞扬你在这个问题上的详细程度。

好吧,原来这是一个有点棘手的问题。 运行 孤立的测试(如发布到此问题)使得错误很少发生。但是,当 运行 在整个测试套件的上下文中时,几乎 100% 的时间都会失败。

我在写入数据库之后和从数据库读取之前添加了 yield task.deferLater(reactor, .00001, lambda: None),这解决了问题。

从那里,我怀疑这可能是连接池和 sqlite 有限的并发容忍度引起的竞争条件。我尝试将 cb_mincb_max 参数设置为 ConnectionPool1,这也解决了问题。

简而言之:似乎 sqlite 不能很好地处理多个连接,适当的解决方法是尽可能避免并发。