ndb.toplevel 和 ndb.transactional 的互动
Interaction of ndb.toplevel and ndb.transactional
有没有可能让 @ndb.toplevel
和 @ndb.transactional
一起玩?
我想要实现的是一个包含 entity.put_async()
调用的交易,但方便的是不必显式地等待期货。 @ndb.toplevel
通常会这样做,但另一个 SO 问题似乎表明它不能与事务结合使用:"Does ndb toplevel break transactions?"
我无法在 App Engine 文档的任何地方找到明确记录的内容。我们可以重现该问题中显示的断言错误,但我们编写了一些测试来查看 put_async()
调用是否失败并且没有发现任何问题。但是,由于有可能丢失数据,因此最好从熟悉 ndb 的人那里得到更具体的答案。
我们简单的测试代码如下。
如果我们同时删除 ndb.toplevel
和 ndb.transactional
装饰器,测试将如预期的那样失败。
但是,如果我们只使用 ndb.transactional
装饰器而忽略 ndb.toplevel
装饰器,则测试会通过,这不是预期的。这让我担心 ndb.transactional
中的开销可能足以使 put_async()
调用有足够的时间完成,但没有任何保证,所以它可能会意外失败?
class AsyncTestModel(ndb.Model):
data = ndb.StringProperty(indexed=True)
@ndb.toplevel
def start_test():
for _ in range(100):
test()
# Check we wrote all the entities
time.sleep(30)
entities = AsyncTestModel.query().fetch()
assert(len(entities) == 1000)
@ndb.transactional(xg=True)
def test():
for _ in range(10):
x = AsyncTestModel()
x.data = make_random_string(1000)
x.put_async()
仅使用 @ndb.transactional
时测试通过的事实是预期的结果:为确保永远不会部分应用事务,@ndb.transactional
等待所有请求完成。
因此代码 for _ in range(100): test()
等待事务在每次迭代时结束。
因此以下测试通过:
@ndb.transactional(xg=True)
def test():
for _ in range(10):
x = AsyncTestModel()
x.data = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(1000))
x.put_async()
for x, _ in enumerate(range(100)):
test()
assert(AsyncTestModel.query().count() == (x + 1) * 10)
注意:如果您在事务中使用异步查询,您可以查看@ndb.transactional_async
(文档here)。
有没有可能让 @ndb.toplevel
和 @ndb.transactional
一起玩?
我想要实现的是一个包含 entity.put_async()
调用的交易,但方便的是不必显式地等待期货。 @ndb.toplevel
通常会这样做,但另一个 SO 问题似乎表明它不能与事务结合使用:"Does ndb toplevel break transactions?"
我无法在 App Engine 文档的任何地方找到明确记录的内容。我们可以重现该问题中显示的断言错误,但我们编写了一些测试来查看 put_async()
调用是否失败并且没有发现任何问题。但是,由于有可能丢失数据,因此最好从熟悉 ndb 的人那里得到更具体的答案。
我们简单的测试代码如下。
如果我们同时删除 ndb.toplevel
和 ndb.transactional
装饰器,测试将如预期的那样失败。
但是,如果我们只使用 ndb.transactional
装饰器而忽略 ndb.toplevel
装饰器,则测试会通过,这不是预期的。这让我担心 ndb.transactional
中的开销可能足以使 put_async()
调用有足够的时间完成,但没有任何保证,所以它可能会意外失败?
class AsyncTestModel(ndb.Model):
data = ndb.StringProperty(indexed=True)
@ndb.toplevel
def start_test():
for _ in range(100):
test()
# Check we wrote all the entities
time.sleep(30)
entities = AsyncTestModel.query().fetch()
assert(len(entities) == 1000)
@ndb.transactional(xg=True)
def test():
for _ in range(10):
x = AsyncTestModel()
x.data = make_random_string(1000)
x.put_async()
仅使用 @ndb.transactional
时测试通过的事实是预期的结果:为确保永远不会部分应用事务,@ndb.transactional
等待所有请求完成。
因此代码 for _ in range(100): test()
等待事务在每次迭代时结束。
因此以下测试通过:
@ndb.transactional(xg=True)
def test():
for _ in range(10):
x = AsyncTestModel()
x.data = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(1000))
x.put_async()
for x, _ in enumerate(range(100)):
test()
assert(AsyncTestModel.query().count() == (x + 1) * 10)
注意:如果您在事务中使用异步查询,您可以查看@ndb.transactional_async
(文档here)。