在 Google Cloud Datastore 上每 insert/delete 之后是否有回调?

Is there anyway to get a callback after every insert/delete on Google Cloud Datastore?

我想将我的 Cloud Datastore 内容与 ElasticSearch 中的索引同步。我希望 ES 索引始终与 Datastore 的内容保持同步。

我注意到 Appengine Python 标准环境中提供了一种等效机制,方法是在数据存储区 Model 中实施 _post_put_hook 方法。这似乎是不可能的,但是使用可用于 flex 环境的 google-cloud-datastore 库。

有什么方法可以在每次插入后收到回调吗?或者我是否必须在数据存储区 API 前面放置一个 "proxy" API ,它会在每个 insert/delete 之后更新我的 ES 索引?

NDB.Model_post_put_hook() 仅当您通过 NDB 将实体写入数据存储时才有效,是的,不幸的是 NDB 库仅在 App Engine Python 标准中可用环境。我不知道 Cloud Datastore 中有这样的功能。如果我没记错的话,Firebase Realtime Database 或 Firestore 都有写入触发器,但我猜你也不急于迁移数据库。

在 Datastore 中,您需要使用上述方法的 "proxy" API,或者您需要修改 Datastore 客户端以在任何成功的写入操作时执行此操作.后者可能会带来更高的失败风险和 ElasticSearch 中的陈旧数据,尤其是在客户端不受您控制的情况下。

我认为如果一致且最新的搜索记录对您的用例很重要,那么自定义 API 是有意义的。数据存储和 Python / NDB(可能带有 Cloud Endpoints)将是一个很好的方法。

我在 GAE Python Standard 上有一个类似的解决方案 运行(尽管使用内置搜索 API 而不是 ElasticSearch)。如果您选择这条路线,您应该注意两个潜在的注意事项:

  1. _post_put_hook() 总是被调用,即使 put 操作失败。我在下面添加了一个代码示例。您可以在文档中找到更多详细信息:model hookshook methods, check_success()

  2. 将数据导出到 ElasticSearch 或搜索 API 会延长您的响应时间。这对于后台任务来说可能不是问题,只需在 _post_put_hook() 中调用导出功能即可。但是,如果用户提出请求,这可能是个问题。对于这些情况,您可以将导出操作推迟到不同的线程,方法是使用 deferred.defer() 方法或创建推送任务)。或多或少,它们是相同的。下面,我使用 defer().

  3. 为每一种要导出的搜索记录添加一个class方法。每当出现问题或移动应用程序/数据存储、添加新的搜索索引等时,您都可以调用此方法,然后从数据存储中逐批查询该类型的所有实体,并导出搜索记录。

延迟导出示例:

class CustomModel(ndb.Model):
    def _post_put_hook(self, future):
        try:
            if future.check_success() is None:
                deferred.defer(export_to_search, self.key)
        except:
            pass  # or log error to Cloud Console with logging.error('blah')

def export_to_search(key=None):
    try:
        if key is not None:
            entity = key.get()
            if entity is not None:
                call_export_api(entity)
    except:
        pass  # or log error to Cloud Console with logging.error('blah')

```