为什么 Twisted 的 adbapi 无法从单元测试中恢复数据?
Why is Twisted's adbapi failing to recover data from within unittests?
概览
上下文
我正在为一些依赖于写入 SQLite3 数据库的高阶逻辑编写单元测试。为此,我使用 twisted.trial.unittest
和 twisted.enterprise.adbapi.ConnectionPool
.
问题陈述
我能够创建持久性 sqlite3 数据库并在其中存储数据。使用 sqlitebrowser,我能够验证数据是否已按预期保留。
问题是调用 t.e.a.ConnectionPool.run*
(例如:runQuery
)return 一组空结果,但仅当从 TestCase
中调用时才会如此。
注释和重要细节
我遇到的问题只发生在 Twisted 的 trial
框架内。我的第一次调试尝试是将数据库代码从单元测试中拉出来,并将其放入一个独立的 test/debug 脚本中。所述脚本按预期工作,而单元测试代码不工作(参见下面的示例)。
案例 1:单元测试行为不当
init.sql
这是用于初始化数据库的脚本。此文件没有(明显的)错误。
CREATE TABLE ajxp_changes ( seq INTEGER PRIMARY KEY AUTOINCREMENT, node_id NUMERIC, type TEXT, source TEXT, target TEXT, deleted_md5 TEXT );
CREATE TABLE ajxp_index ( node_id INTEGER PRIMARY KEY AUTOINCREMENT, node_path TEXT, bytesize NUMERIC, md5 TEXT, mtime NUMERIC, stat_result BLOB);
CREATE TABLE ajxp_last_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT, location TEXT, source TEXT, target TEXT );
CREATE TABLE ajxp_node_status ("node_id" INTEGER PRIMARY KEY NOT NULL , "status" TEXT NOT NULL DEFAULT 'NEW', "detail" TEXT);
CREATE TABLE events (id INTEGER PRIMARY KEY AUTOINCREMENT, type text, message text, source text, target text, action text, status text, date text);
CREATE TRIGGER LOG_DELETE AFTER DELETE ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type,deleted_md5) VALUES (old.node_id, old.node_path, "NULL", "delete", old.md5); END;
CREATE TRIGGER LOG_INSERT AFTER INSERT ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type) VALUES (new.node_id, "NULL", new.node_path, "create"); END;
CREATE TRIGGER LOG_UPDATE_CONTENT AFTER UPDATE ON "ajxp_index" FOR EACH ROW BEGIN INSERT INTO "ajxp_changes" (node_id,source,target,type) VALUES (new.node_id, old.node_path, new.node_path, CASE WHEN old.node_path = new.node_path THEN "content" ELSE "path" END);END;
CREATE TRIGGER STATUS_DELETE AFTER DELETE ON "ajxp_index" BEGIN DELETE FROM ajxp_node_status WHERE node_id=old.node_id; END;
CREATE TRIGGER STATUS_INSERT AFTER INSERT ON "ajxp_index" BEGIN INSERT INTO ajxp_node_status (node_id) VALUES (new.node_id); END;
CREATE INDEX changes_node_id ON ajxp_changes( node_id );
CREATE INDEX changes_type ON ajxp_changes( type );
CREATE INDEX changes_node_source ON ajxp_changes( source );
CREATE INDEX index_node_id ON ajxp_index( node_id );
CREATE INDEX index_node_path ON ajxp_index( node_path );
CREATE INDEX index_bytesize ON ajxp_index( bytesize );
CREATE INDEX index_md5 ON ajxp_index( md5 );
CREATE INDEX node_status_status ON ajxp_node_status( status );
test_sqlite.py
这是意外失败的单元测试 class。 TestStateManagement.test_db_clean
通过,表示表已正确创建。 TestStateManagement.test_inode_create
失败,报告检索到零个结果。
import os.path as osp
from twisted.internet import defer
from twisted.enterprise import adbapi
import sqlengine # see below
class TestStateManagement(TestCase):
def setUp(self):
self.meta = mkdtemp()
self.db = adbapi.ConnectionPool(
"sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
)
self.stateman = sqlengine.StateManager(self.db)
with open("init.sql") as f:
script = f.read()
self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)
def tearDown(self):
self.db.close()
del self.db
del self.stateman
del self.d
rmtree(self.meta)
@defer.inlineCallbacks
def test_db_clean(self):
"""Canary test to ensure that the db is initialized in a blank state"""
yield self.d # wait for db to be initialized
q = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
for table in ("ajxp_index", "ajxp_changes"):
res = yield self.db.runQuery(q, (table,))
self.assertTrue(
len(res) == 1,
"table {0} does not exist".format(table)
)
@defer.inlineCallbacks
def test_inode_create_file(self):
yield self.d
path = osp.join(self.ws, "test.txt")
with open(path, "wt") as f:
pass
inode = mk_dummy_inode(path)
yield self.stateman.create(inode, directory=False)
entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
emsg = "got {0} results, expected 1. Are canary tests failing?"
lentry = len(entry)
self.assertTrue(lentry == 1, emsg.format(lentry))
sqlengine.py
这些是上述单元测试正在测试的工件。
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
class StateManager:
"""Manages the SQLite database's state, ensuring that it reflects the state
of the filesystem.
"""
log = Logger()
def __init__(self, db):
self._db = db
def create(self, inode, directory=False):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return self._db.runOperation(directive, params)
案例 2:错误在 twisted.trial
之外消失
#! /usr/bin/env python
import os.path as osp
from tempfile import mkdtemp
from twisted.enterprise import adbapi
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks
INIT_FILE = "example.sql"
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
def create(db, inode):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return db.runOperation(directive, params)
def init_database(db):
with open(INIT_FILE) as f:
script = f.read()
return db.runInteraction(lambda c, s: c.executescript(s), script)
@react
@inlineCallbacks
def main(reactor):
meta = mkdtemp()
db = adbapi.ConnectionPool(
"sqlite3", osp.join(meta, "db.sqlite"), check_same_thread=False,
)
yield init_database(db)
# Let's make sure the tables were created as expected and that we're
# starting from a blank slate
res = yield db.runQuery("SELECT * FROM ajxp_index LIMIT 1")
assert not res, "database is not empty [ajxp_index]"
res = yield db.runQuery("SELECT * FROM ajxp_changes LIMIT 1")
assert not res, "database is not empty [ajxp_changes]"
# The details of this are not important. Suffice to say they (should)
# conform to the DB schema for ajxp_index.
test_data = {
"node_path": "/this/is/some/arbitrary/path.ext",
"bytesize": 0,
"mtime": 179273.0,
"stat_result": b"this simulates a blob of raw binary data",
"md5": "d41d8cd98f00b204e9800998ecf8427e", # arbitrary
}
# store the test data in the ajxp_index table
yield create(db, test_data)
# test if the entry exists in the db
entry = yield db.runQuery("SELECT * FROM ajxp_index")
assert len(entry) == 1, "got {0} results, expected 1".format(len(entry))
print("OK")
结束语
同样,在使用 sqlitebrowser 检查时,数据似乎正在写入 db.sqlite
,因此这看起来像是一个 检索 问题。从这里开始,我有点难过……有什么想法吗?
编辑
此代码将生成可用于测试的 inode
。
def mk_dummy_inode(path, isdir=False):
return {
"node_path": path,
"bytesize": osp.getsize(path),
"mtime": osp.getmtime(path),
"stat_result": dumps(stat(path), protocol=4),
"md5": "directory" if isdir else "d41d8cd98f00b204e9800998ecf8427e",
}
如果您查看 setUp
函数,您将返回 self.db.runInteraction(...)
,其中 returns 已延迟。正如您所指出的,您假设它等待延迟完成。然而,情况并非如此,这是大多数人(包括我自己)的陷阱。老实说,对于这种情况,尤其是单元测试,我只是在 TestCase
class 之外执行同步代码来初始化数据库。例如:
def init_db():
import sqlite3
conn = sqlite3.connect('db.sqlite')
c = conn.cursor()
with open("init.sql") as f:
c.executescript(f.read())
init_db() # call outside test case
class TestStateManagement(TestCase):
"""
My test cases
"""
或者,您可以装饰设置 yield runOperation(...)
但有些事情告诉我它不起作用...无论如何,令人惊讶的是没有出现任何错误。
PS
我关注这个问题已经有一段时间了,现在它已经在我脑海中萦绕了好几天了。将近凌晨 1 点,我终于明白了一个潜在的原因。但是,我太 tired/lazy 无法实际测试它 :D 但这是一个非常好的预感。我想赞扬你在这个问题上的详细程度。
好吧,原来这是一个有点棘手的问题。 运行 孤立的测试(如发布到此问题)使得错误很少发生。但是,当 运行 在整个测试套件的上下文中时,几乎 100% 的时间都会失败。
我在写入数据库之后和从数据库读取之前添加了 yield task.deferLater(reactor, .00001, lambda: None)
,这解决了问题。
从那里,我怀疑这可能是连接池和 sqlite 有限的并发容忍度引起的竞争条件。我尝试将 cb_min
和 cb_max
参数设置为 ConnectionPool
到 1
,这也解决了问题。
简而言之:似乎 sqlite 不能很好地处理多个连接,适当的解决方法是尽可能避免并发。
概览
上下文
我正在为一些依赖于写入 SQLite3 数据库的高阶逻辑编写单元测试。为此,我使用 twisted.trial.unittest
和 twisted.enterprise.adbapi.ConnectionPool
.
问题陈述
我能够创建持久性 sqlite3 数据库并在其中存储数据。使用 sqlitebrowser,我能够验证数据是否已按预期保留。
问题是调用 t.e.a.ConnectionPool.run*
(例如:runQuery
)return 一组空结果,但仅当从 TestCase
中调用时才会如此。
注释和重要细节
我遇到的问题只发生在 Twisted 的 trial
框架内。我的第一次调试尝试是将数据库代码从单元测试中拉出来,并将其放入一个独立的 test/debug 脚本中。所述脚本按预期工作,而单元测试代码不工作(参见下面的示例)。
案例 1:单元测试行为不当
init.sql
这是用于初始化数据库的脚本。此文件没有(明显的)错误。
CREATE TABLE ajxp_changes ( seq INTEGER PRIMARY KEY AUTOINCREMENT, node_id NUMERIC, type TEXT, source TEXT, target TEXT, deleted_md5 TEXT );
CREATE TABLE ajxp_index ( node_id INTEGER PRIMARY KEY AUTOINCREMENT, node_path TEXT, bytesize NUMERIC, md5 TEXT, mtime NUMERIC, stat_result BLOB);
CREATE TABLE ajxp_last_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT, location TEXT, source TEXT, target TEXT );
CREATE TABLE ajxp_node_status ("node_id" INTEGER PRIMARY KEY NOT NULL , "status" TEXT NOT NULL DEFAULT 'NEW', "detail" TEXT);
CREATE TABLE events (id INTEGER PRIMARY KEY AUTOINCREMENT, type text, message text, source text, target text, action text, status text, date text);
CREATE TRIGGER LOG_DELETE AFTER DELETE ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type,deleted_md5) VALUES (old.node_id, old.node_path, "NULL", "delete", old.md5); END;
CREATE TRIGGER LOG_INSERT AFTER INSERT ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type) VALUES (new.node_id, "NULL", new.node_path, "create"); END;
CREATE TRIGGER LOG_UPDATE_CONTENT AFTER UPDATE ON "ajxp_index" FOR EACH ROW BEGIN INSERT INTO "ajxp_changes" (node_id,source,target,type) VALUES (new.node_id, old.node_path, new.node_path, CASE WHEN old.node_path = new.node_path THEN "content" ELSE "path" END);END;
CREATE TRIGGER STATUS_DELETE AFTER DELETE ON "ajxp_index" BEGIN DELETE FROM ajxp_node_status WHERE node_id=old.node_id; END;
CREATE TRIGGER STATUS_INSERT AFTER INSERT ON "ajxp_index" BEGIN INSERT INTO ajxp_node_status (node_id) VALUES (new.node_id); END;
CREATE INDEX changes_node_id ON ajxp_changes( node_id );
CREATE INDEX changes_type ON ajxp_changes( type );
CREATE INDEX changes_node_source ON ajxp_changes( source );
CREATE INDEX index_node_id ON ajxp_index( node_id );
CREATE INDEX index_node_path ON ajxp_index( node_path );
CREATE INDEX index_bytesize ON ajxp_index( bytesize );
CREATE INDEX index_md5 ON ajxp_index( md5 );
CREATE INDEX node_status_status ON ajxp_node_status( status );
test_sqlite.py
这是意外失败的单元测试 class。 TestStateManagement.test_db_clean
通过,表示表已正确创建。 TestStateManagement.test_inode_create
失败,报告检索到零个结果。
import os.path as osp
from twisted.internet import defer
from twisted.enterprise import adbapi
import sqlengine # see below
class TestStateManagement(TestCase):
def setUp(self):
self.meta = mkdtemp()
self.db = adbapi.ConnectionPool(
"sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
)
self.stateman = sqlengine.StateManager(self.db)
with open("init.sql") as f:
script = f.read()
self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)
def tearDown(self):
self.db.close()
del self.db
del self.stateman
del self.d
rmtree(self.meta)
@defer.inlineCallbacks
def test_db_clean(self):
"""Canary test to ensure that the db is initialized in a blank state"""
yield self.d # wait for db to be initialized
q = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
for table in ("ajxp_index", "ajxp_changes"):
res = yield self.db.runQuery(q, (table,))
self.assertTrue(
len(res) == 1,
"table {0} does not exist".format(table)
)
@defer.inlineCallbacks
def test_inode_create_file(self):
yield self.d
path = osp.join(self.ws, "test.txt")
with open(path, "wt") as f:
pass
inode = mk_dummy_inode(path)
yield self.stateman.create(inode, directory=False)
entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
emsg = "got {0} results, expected 1. Are canary tests failing?"
lentry = len(entry)
self.assertTrue(lentry == 1, emsg.format(lentry))
sqlengine.py
这些是上述单元测试正在测试的工件。
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
class StateManager:
"""Manages the SQLite database's state, ensuring that it reflects the state
of the filesystem.
"""
log = Logger()
def __init__(self, db):
self._db = db
def create(self, inode, directory=False):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return self._db.runOperation(directive, params)
案例 2:错误在 twisted.trial
之外消失
#! /usr/bin/env python
import os.path as osp
from tempfile import mkdtemp
from twisted.enterprise import adbapi
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks
INIT_FILE = "example.sql"
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
def create(db, inode):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return db.runOperation(directive, params)
def init_database(db):
with open(INIT_FILE) as f:
script = f.read()
return db.runInteraction(lambda c, s: c.executescript(s), script)
@react
@inlineCallbacks
def main(reactor):
meta = mkdtemp()
db = adbapi.ConnectionPool(
"sqlite3", osp.join(meta, "db.sqlite"), check_same_thread=False,
)
yield init_database(db)
# Let's make sure the tables were created as expected and that we're
# starting from a blank slate
res = yield db.runQuery("SELECT * FROM ajxp_index LIMIT 1")
assert not res, "database is not empty [ajxp_index]"
res = yield db.runQuery("SELECT * FROM ajxp_changes LIMIT 1")
assert not res, "database is not empty [ajxp_changes]"
# The details of this are not important. Suffice to say they (should)
# conform to the DB schema for ajxp_index.
test_data = {
"node_path": "/this/is/some/arbitrary/path.ext",
"bytesize": 0,
"mtime": 179273.0,
"stat_result": b"this simulates a blob of raw binary data",
"md5": "d41d8cd98f00b204e9800998ecf8427e", # arbitrary
}
# store the test data in the ajxp_index table
yield create(db, test_data)
# test if the entry exists in the db
entry = yield db.runQuery("SELECT * FROM ajxp_index")
assert len(entry) == 1, "got {0} results, expected 1".format(len(entry))
print("OK")
结束语
同样,在使用 sqlitebrowser 检查时,数据似乎正在写入 db.sqlite
,因此这看起来像是一个 检索 问题。从这里开始,我有点难过……有什么想法吗?
编辑
此代码将生成可用于测试的 inode
。
def mk_dummy_inode(path, isdir=False):
return {
"node_path": path,
"bytesize": osp.getsize(path),
"mtime": osp.getmtime(path),
"stat_result": dumps(stat(path), protocol=4),
"md5": "directory" if isdir else "d41d8cd98f00b204e9800998ecf8427e",
}
如果您查看 setUp
函数,您将返回 self.db.runInteraction(...)
,其中 returns 已延迟。正如您所指出的,您假设它等待延迟完成。然而,情况并非如此,这是大多数人(包括我自己)的陷阱。老实说,对于这种情况,尤其是单元测试,我只是在 TestCase
class 之外执行同步代码来初始化数据库。例如:
def init_db():
import sqlite3
conn = sqlite3.connect('db.sqlite')
c = conn.cursor()
with open("init.sql") as f:
c.executescript(f.read())
init_db() # call outside test case
class TestStateManagement(TestCase):
"""
My test cases
"""
或者,您可以装饰设置 yield runOperation(...)
但有些事情告诉我它不起作用...无论如何,令人惊讶的是没有出现任何错误。
PS
我关注这个问题已经有一段时间了,现在它已经在我脑海中萦绕了好几天了。将近凌晨 1 点,我终于明白了一个潜在的原因。但是,我太 tired/lazy 无法实际测试它 :D 但这是一个非常好的预感。我想赞扬你在这个问题上的详细程度。
好吧,原来这是一个有点棘手的问题。 运行 孤立的测试(如发布到此问题)使得错误很少发生。但是,当 运行 在整个测试套件的上下文中时,几乎 100% 的时间都会失败。
我在写入数据库之后和从数据库读取之前添加了 yield task.deferLater(reactor, .00001, lambda: None)
,这解决了问题。
从那里,我怀疑这可能是连接池和 sqlite 有限的并发容忍度引起的竞争条件。我尝试将 cb_min
和 cb_max
参数设置为 ConnectionPool
到 1
,这也解决了问题。
简而言之:似乎 sqlite 不能很好地处理多个连接,适当的解决方法是尽可能避免并发。