在 Google Cloud Datastore 上每 insert/delete 之后是否有回调?
Is there anyway to get a callback after every insert/delete on Google Cloud Datastore?
我想将我的 Cloud Datastore 内容与 ElasticSearch 中的索引同步。我希望 ES 索引始终与 Datastore 的内容保持同步。
我注意到 Appengine Python 标准环境中提供了一种等效机制,方法是在数据存储区 Model
中实施 _post_put_hook
方法。这似乎是不可能的,但是使用可用于 flex 环境的 google-cloud-datastore
库。
有什么方法可以在每次插入后收到回调吗?或者我是否必须在数据存储区 API 前面放置一个 "proxy" API ,它会在每个 insert/delete 之后更新我的 ES 索引?
NDB.Model
的 _post_put_hook()
仅当您通过 NDB 将实体写入数据存储时才有效,是的,不幸的是 NDB 库仅在 App Engine Python 标准中可用环境。我不知道 Cloud Datastore 中有这样的功能。如果我没记错的话,Firebase Realtime Database 或 Firestore 都有写入触发器,但我猜你也不急于迁移数据库。
在 Datastore 中,您需要使用上述方法的 "proxy" API,或者您需要修改 Datastore 客户端以在任何成功的写入操作时执行此操作.后者可能会带来更高的失败风险和 ElasticSearch 中的陈旧数据,尤其是在客户端不受您控制的情况下。
我认为如果一致且最新的搜索记录对您的用例很重要,那么自定义 API 是有意义的。数据存储和 Python / NDB(可能带有 Cloud Endpoints)将是一个很好的方法。
我在 GAE Python Standard 上有一个类似的解决方案 运行(尽管使用内置搜索 API 而不是 ElasticSearch)。如果您选择这条路线,您应该注意两个潜在的注意事项:
_post_put_hook()
总是被调用,即使 put 操作失败。我在下面添加了一个代码示例。您可以在文档中找到更多详细信息:model hooks,
hook methods,
check_success()
将数据导出到 ElasticSearch 或搜索 API 会延长您的响应时间。这对于后台任务来说可能不是问题,只需在 _post_put_hook()
中调用导出功能即可。但是,如果用户提出请求,这可能是个问题。对于这些情况,您可以将导出操作推迟到不同的线程,方法是使用 deferred.defer()
方法或创建推送任务)。或多或少,它们是相同的。下面,我使用 defer()
.
为每一种要导出的搜索记录添加一个class方法。每当出现问题或移动应用程序/数据存储、添加新的搜索索引等时,您都可以调用此方法,然后从数据存储中逐批查询该类型的所有实体,并导出搜索记录。
延迟导出示例:
class CustomModel(ndb.Model):
def _post_put_hook(self, future):
try:
if future.check_success() is None:
deferred.defer(export_to_search, self.key)
except:
pass # or log error to Cloud Console with logging.error('blah')
def export_to_search(key=None):
try:
if key is not None:
entity = key.get()
if entity is not None:
call_export_api(entity)
except:
pass # or log error to Cloud Console with logging.error('blah')
```
我想将我的 Cloud Datastore 内容与 ElasticSearch 中的索引同步。我希望 ES 索引始终与 Datastore 的内容保持同步。
我注意到 Appengine Python 标准环境中提供了一种等效机制,方法是在数据存储区 Model
中实施 _post_put_hook
方法。这似乎是不可能的,但是使用可用于 flex 环境的 google-cloud-datastore
库。
有什么方法可以在每次插入后收到回调吗?或者我是否必须在数据存储区 API 前面放置一个 "proxy" API ,它会在每个 insert/delete 之后更新我的 ES 索引?
NDB.Model
的 _post_put_hook()
仅当您通过 NDB 将实体写入数据存储时才有效,是的,不幸的是 NDB 库仅在 App Engine Python 标准中可用环境。我不知道 Cloud Datastore 中有这样的功能。如果我没记错的话,Firebase Realtime Database 或 Firestore 都有写入触发器,但我猜你也不急于迁移数据库。
在 Datastore 中,您需要使用上述方法的 "proxy" API,或者您需要修改 Datastore 客户端以在任何成功的写入操作时执行此操作.后者可能会带来更高的失败风险和 ElasticSearch 中的陈旧数据,尤其是在客户端不受您控制的情况下。
我认为如果一致且最新的搜索记录对您的用例很重要,那么自定义 API 是有意义的。数据存储和 Python / NDB(可能带有 Cloud Endpoints)将是一个很好的方法。
我在 GAE Python Standard 上有一个类似的解决方案 运行(尽管使用内置搜索 API 而不是 ElasticSearch)。如果您选择这条路线,您应该注意两个潜在的注意事项:
_post_put_hook()
总是被调用,即使 put 操作失败。我在下面添加了一个代码示例。您可以在文档中找到更多详细信息:model hooks, hook methods, check_success()将数据导出到 ElasticSearch 或搜索 API 会延长您的响应时间。这对于后台任务来说可能不是问题,只需在
_post_put_hook()
中调用导出功能即可。但是,如果用户提出请求,这可能是个问题。对于这些情况,您可以将导出操作推迟到不同的线程,方法是使用deferred.defer()
方法或创建推送任务)。或多或少,它们是相同的。下面,我使用defer()
.为每一种要导出的搜索记录添加一个class方法。每当出现问题或移动应用程序/数据存储、添加新的搜索索引等时,您都可以调用此方法,然后从数据存储中逐批查询该类型的所有实体,并导出搜索记录。
延迟导出示例:
class CustomModel(ndb.Model):
def _post_put_hook(self, future):
try:
if future.check_success() is None:
deferred.defer(export_to_search, self.key)
except:
pass # or log error to Cloud Console with logging.error('blah')
def export_to_search(key=None):
try:
if key is not None:
entity = key.get()
if entity is not None:
call_export_api(entity)
except:
pass # or log error to Cloud Console with logging.error('blah')
```