使用 elasticsearch DSL 等待 UpdateByQuery 完成

Wait for completion of UpdateByQuery with the elasticsearch DSL

我正在处理一个非常大的数据集,我需要清理(删除)一些文档的一些属性,然后立即将这样的 属性 添加到其他文档。有时,删除了 属性 的文档是我应该更新的文档。问题是有时会出现 ConflictError,我想知道如何等待第一个查询完全执行后再执行第二个查询。这是我正在使用的代码:

ubq = UpdateByQuery(using=self.es, index=self.index).update_from_dict(query1).script(source=script_remove_source)
ubq.execute()

ubq = UpdateByQuery(using=self.es, index=self.index).update_from_dict(query2).script(source=script_add_source)
ubq.execute()

有什么想法吗?

在 elastic 文档中他们提到了参数 wait_for_completion, but they don't present an example of use. And anyway, that's not the Elasticsearch DSL. I read the DSL docs 但没有提到同步或异步。

我现在正在做的是在 3 秒之间进行睡眠...它有效,但那太糟糕了。

提前致谢!

elasticsearch-dsl-pyelasticsearch-py 之上的高级 API。看起来 wait_for_completion 已经默认为 true (https://github.com/elastic/elasticsearch-py/blob/9f4baacb7059c9fb1f949fd8e726749137408a9c/elasticsearch/client/init.py#L936) and it looks like elasticsearch-dsl-py doesn't change that (https://github.com/elastic/elasticsearch-dsl-py/blob/601f7e9c5a708a3b6144851053e0544660bcf0a7/elasticsearch_dsl/update_by_query.py#L145)。

如果触发刷新怎么办?看起来你可以根据 https://github.com/elastic/elasticsearch-dsl-py/issues/870.

中的示例将此类参数转发到较低级别 API

是否可以有另一个进程 interacting/updating 记录?

我终于用retry_on_conflict搞定了:

es.update(
                index=index,
                doc_type=doc_type,
                id=id_str,
                body={"doc": {
                    session: state
                }},
                retry_on_conflict=5
            )

试试这个

from elasticsearch_dsl import Q, Search, Index

ubq = UpdateByQuery(using=self.es, index=self.index).update_from_dict(query1).script(source=script_remove_source)
ubq.execute()

Index(self.index).refresh()
ubq = UpdateByQuery(using=self.es, index=self.index).update_from_dict(query2).script(source=script_add_source)
ubq.execute()