定期处理和更新 elasticsearch 索引中的文档
Periodically process and update documents in elasticsearch index
我需要想出一个策略来定期高效地处理和更新 elasticsearch 索引中的文档。我不必查看我之前处理过的文件。
我的设置是我有一个很长的 运行ning 过程,它不断地将文档插入索引,比如说大约。每小时 500 个文档(想想常见的日志记录示例)。
我需要找到一个解决方案来定期更新一些文档(例如通过 cron 作业)到 运行 特定字段(例如文本字段)上的一些代码以增强该文档新字段的数量。我想这样做是为了在索引上提供更细粒度的聚合。在日志类比中,这可能是,例如,我从日志条目(文档)中获取 UserAgent 字符串,对其进行一些解析,然后将一些新字段添加回该文档并为其编制索引。
所以我的方法是:
- 获得一些我以前没有看过的文件(甚至全部)。例如,我可以通过组合
must_not
和 exists
来查询它们。
- 运行 我在这些文档上的代码(运行 解析器,计算一些新东西,等等)。
- 更新之前获得的文件(最好是通过批量 api)。
我知道有Update by query API。但这似乎不在这里,因为我需要 运行 我自己的代码(顺便说一下,这取决于外部库),在我的服务器上而不是作为一个无痛脚本,它不会提供我需要的综合任务.
我正在通过 python 访问 elasticsearch。
现在的问题是我不知道如何实现上面的方法。例如。如果第一步得到的文档数量大于myindex.settings.index.max_result_window
怎么办?
有什么想法吗?
我考虑了@Jay 的评论并最终得到了这个模式,目前:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
from my_module.postprocessing import post_process_doc
es = Elasticsearch(...)
es.ping()
def update_docs( docs ):
""""""
for idx,doc in enumerate(docs):
if idx % 10000 == 0:
print( 'next 10k' )
new_field_value = post_process_doc( doc )
doc_update = {
"_index": doc["_index"],
"_id" : doc["_id"],
"_op_type" : "update",
"doc" : { <<the new field>> : new_field_value }
}
yield doc_update
docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )
bulk( es, update_docs( docs ) )
评论:
- 我了解到,当您滚动并在查询请求中传递相应的 ID 时,elasticsearch 会保留搜索结果视图。 scan abstraction method 将为您处理。上面方法中的 scroll-parameter 告诉 elasticsearch 视图会打开多长时间,即视图会保持多长时间。
- 正如我在评论中所述,文档说它们 no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging use .. point in time (PIT),但我还没有尝试过。
- 在我的实现中,我需要通过
preserve_over=True
,否则会抛出错误。
- 记得事先更新索引的映射,例如,当您想要将 nested fields 添加为文档中的另一个字段时。
我需要想出一个策略来定期高效地处理和更新 elasticsearch 索引中的文档。我不必查看我之前处理过的文件。
我的设置是我有一个很长的 运行ning 过程,它不断地将文档插入索引,比如说大约。每小时 500 个文档(想想常见的日志记录示例)。
我需要找到一个解决方案来定期更新一些文档(例如通过 cron 作业)到 运行 特定字段(例如文本字段)上的一些代码以增强该文档新字段的数量。我想这样做是为了在索引上提供更细粒度的聚合。在日志类比中,这可能是,例如,我从日志条目(文档)中获取 UserAgent 字符串,对其进行一些解析,然后将一些新字段添加回该文档并为其编制索引。
所以我的方法是:
- 获得一些我以前没有看过的文件(甚至全部)。例如,我可以通过组合
must_not
和exists
来查询它们。 - 运行 我在这些文档上的代码(运行 解析器,计算一些新东西,等等)。
- 更新之前获得的文件(最好是通过批量 api)。
我知道有Update by query API。但这似乎不在这里,因为我需要 运行 我自己的代码(顺便说一下,这取决于外部库),在我的服务器上而不是作为一个无痛脚本,它不会提供我需要的综合任务.
我正在通过 python 访问 elasticsearch。
现在的问题是我不知道如何实现上面的方法。例如。如果第一步得到的文档数量大于myindex.settings.index.max_result_window
怎么办?
有什么想法吗?
我考虑了@Jay 的评论并最终得到了这个模式,目前:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
from my_module.postprocessing import post_process_doc
es = Elasticsearch(...)
es.ping()
def update_docs( docs ):
""""""
for idx,doc in enumerate(docs):
if idx % 10000 == 0:
print( 'next 10k' )
new_field_value = post_process_doc( doc )
doc_update = {
"_index": doc["_index"],
"_id" : doc["_id"],
"_op_type" : "update",
"doc" : { <<the new field>> : new_field_value }
}
yield doc_update
docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )
bulk( es, update_docs( docs ) )
评论:
- 我了解到,当您滚动并在查询请求中传递相应的 ID 时,elasticsearch 会保留搜索结果视图。 scan abstraction method 将为您处理。上面方法中的 scroll-parameter 告诉 elasticsearch 视图会打开多长时间,即视图会保持多长时间。
- 正如我在评论中所述,文档说它们 no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging use .. point in time (PIT),但我还没有尝试过。
- 在我的实现中,我需要通过
preserve_over=True
,否则会抛出错误。 - 记得事先更新索引的映射,例如,当您想要将 nested fields 添加为文档中的另一个字段时。