大量 NDB 实体的更新失败

Update for big count of NDB Entities fails

我的任务很简单。迁移并将新字段(重复和复合 属性)添加到现有 NDB 实体(~100K 实体)后,我需要为其设置默认值。

我先尝试了那个代码:

q = dm.E.query(ancestor=dm.E.root_key)
for user in q.iter(batch_size=500):
     user.field1 = [dm.E2()]
     user.put()

但失败并出现以下错误:

2015-04-25 20:41:44.792 /**** 500 599830ms 0kb AppEngine-Google; (+http://code.google.com/appengine) module=default version=1-17-0
W 2015-04-25 20:32:46.675 suspended generator run_to_queue(query.py:938) raised Timeout(The datastore operation timed out, or the data was temporarily unavailable.)
W 2015-04-25 20:32:46.676 suspended generator helper(context.py:876) raised Timeout(The datastore operation timed out, or the data was temporarily unavailable.)
E 2015-04-25 20:41:44.475 Traceback (most recent call last): File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/runtime/wsgi.py", line 267, in

任务在单独的任务队列上运行,因此它至少有 10 分钟的时间来执行,但似乎还不够。另一件奇怪的事情:来自 NDB 的警告。可能存在死锁,因为来自其他实例的相同实体的更新(由用户启动)但不确定。

无论如何,我想知道此类任务的最佳做法(也是最简单的做法)。我知道 MapReduce,但目前它看起来过于复杂,无法完成此类任务。

更新:

我还尝试通过获取数组中的所有实体来使用 put_multi,但是 GAE 在实例超过 ~600 MB 内存(限制为 500 MB)时立即停止该实例。似乎内存不足以存储所有实体 (~100K)。

执行 _migrate_users() 后,它将处理 50 个用户,然后创建另一个任务来处理接下来的 50 个用户,依此类推。 根据实体的大小,您可以使用大于 50 的批量大小。

def _migrate_users(curs=None):
  users, next_curs, more = User.query().fetch_page(50, start_cursor=curs)
  for user in users:
    user.field1 = 'bla bla'
  ndb.put_multi(users)
  if more:
    deferred.defer(_migrate_users, next_curs, _queue='default')