从 Google App Engine NDB 中删除大量实体

Deleting massive of entities from Google App Engine NDB

前面的人在我们的 Google App Engine 应用程序中制造了一些问题。目前,该应用正在保存具有 NULL 值的实体,但如果我们能够清除所有这些值会更好。

这里是 ndb.Modal:

class Day(ndb.Model):
    date = ndb.DateProperty(required=True, indexed=True)
    items = ndb.StringProperty(repeated=True, indexed=False)
    reason = ndb.StringProperty(name="cancelled", indexed=False)

    is_hole = ndb.ComputedProperty(lambda s: not bool(s.items or s.reason))

不知何故,我们需要删除所有 Day,其中 is_holetrue

大约有 4 000 000 个实体,其中应该在服务器上删除大约 2 000 000 个实体。

到目前为止的代码

我认为最好先使用以下代码计算我们应该删除多少个实体:

count = Day.query(Day.is_hole != False).count(10000)

这(限制为 10 000)需要大约 5 秒才能 运行。如果没有限制,它会出现 DeadLineException.

为了删除,我试过这个代码:

ndb.delete_multi([key for key in Day.query(Day.is_hole != False).fetch(10000, keys_only=True)])

这(有限制)大约需要 30 秒。

问题

如何更快 删除 is_hole != False 中的所有 Day

(我们正在使用 Python)

不,没有更快的删除实体的方法 - 截止日期是固定的。

不过也有一些技巧。

  1. 如果你要使用,你可以延长截止日期https://cloud.google.com/appengine/docs/python/taskqueue/你可以将一些任务放入队列中,在第一个任务(重复)之后生成下一个任务。
  2. 另一个类似于任务队列的选项是在删除一些坏记录后重定向到正在删除的同一个处理程序,同时删除最后一条记录。需要打开浏览器到最后。
if at_least_one_bad_record:
  delete_some_records (not longer than 30s)
  spawn again this task or redirect to this handler (next call will have next 30s)

记住,如果没有更多好的记录,它有退出点。它将删除所有匹配的记录,无需再次点击。

最好的方法是使用 MapReduce,它将 运行 在任务队列中,您还可以进行分片以并行工作。这是 python 代码。如果您需要任何说明,请告诉我

main.py

from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader
from google.appengine.api import app_identity

def deleteEntity(entity):
    yield op.db.Delete(entity)

class DeleteEntitiesPipeline(base_handler.PipelineBase):
    def run(self):
        bucket_name = (app_identity.get_default_gcs_bucket_name())
        yield mapreduce_pipeline.MapPipeline(
                "job_name", 
                "main.deleteEntity", 
                "mapreduce.input_readers.DatastoreInputReader", 
                params={
                    "entity_kind": 'models.Day',
                    "filters": [("is_hole", "=", True)],
                    "bucket_name": bucket_name
                },
                shards=5)

class StartDelete(webapp2.RequestHandler):
    def get(self):
        pipeline = DeleteEntitiesPipeline()
        pipeline.start()

application = webapp2.WSGIApplication([
    ('/deleteentities', StartDelete),
], debug=True)