ElasticSearch 更新不是即时的,你如何等待 ElasticSearch 完成更新它的索引?

ElasticSearch updates are not immediate, how do you wait for ElasticSearch to finish updating it's index?

我正在尝试提高针对 ElasticSearch 进行测试的套件的性能。

测试需要很长时间,因为 Elasticsearch 在更新后不会立即更新它的索引。例如,以下代码 运行s 没有引发断言错误。

from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

results = elasticsearch.search()
assert not results
# results are not populated

目前针对此问题的联合解决方案是删除对代码的 time.sleep 调用,让 ElasticSearch 有时间更新它的索引。

from time import sleep
from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

# Don't want to use sleep functions
sleep(1)

results = elasticsearch.search()
assert len(results) == 1
# results are now populated

显然这不是很好,因为它很容易失败,假设如果 ElasticSearch 更新其索引的时间超过一秒,那么测试将失败,尽管这不太可能。此外,当您 运行 像这样进行 100 次测试时,它会非常慢。

我尝试解决此问题的方法是查询 pending cluster jobs 以查看是否还有任何任务需要完成。然而,这不起作用,并且此代码将 运行 没有断言错误。

from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

# Query if there are any pending tasks
while elasticsearch.cluster.pending_tasks()['tasks']:
    pass

results = elasticsearch.search()
assert not results
# results are not populated

所以基本上,回到我原来的问题,ElasticSearch 更新不是 立即,你如何等待 ElasticSearch 完成更新它的索引?

从 5.0.0 版本开始,elasticsearch 有一个选项:

 ?refresh=wait_for

在索引、更新、删除和批量 api 上。这样,在结果在 ElasticSearch 中可见之前,请求不会收到响应。 (耶!)

有关详细信息,请参阅 https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-refresh.html

编辑:似乎这个功能已经是最新的 Python elasticsearch api 的一部分: https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.index

将您的 elasticsearch.update 更改为:

elasticsearch.update(
     index='blog',
     doc_type='blog'
     id=1,
     refresh='wait_for',
     body={
        ....
    }
)

而且您不需要任何睡眠或轮询。

似乎对我有用:

els.indices.refresh(index)
els.cluster.health(wait_for_no_relocating_shards=True,wait_for_active_shards='all')

如果不想等待集群刷新间隔也可以调用elasticsearch.Refresh('blog')

如果您使用批量助手,您可以这样做:

from elasticsearch.helpers import bulk    
bulk(client=self.es, actions=data, refresh='wait_for')

Elasticsearch 做 near real-time search。 updated/indexed 文档不能立即搜索,只能在下一次刷新操作后搜索。每 1 秒安排一次刷新。

要在 updating/indexing 之后检索文档,您应该改用 GET api。 By default, the get API is realtime, and is not affected by the refresh rate of the index。这意味着如果 update/index 正确完成,您应该在 GET 请求的响应中看到修改。

如果您坚持使用 SEARCH api 检索 updating/indexing 之后的文档。 Then from the documentation, there are 3 solutions:

  • 等待刷新间隔
  • 在 index/update/delete 请求中设置 ?refresh option
  • 在 index/update 请求后使用 Refresh API 显式完成刷新 (POST _refresh)。但是,请注意刷新为 resource-intensive.