使用 Python 更新所有使用字典输入的 elasticsearch 文档

Update all elasticsearch docs using a dict for input using Python

更新所有文档

背景信息

我有一个用例,我需要更新索引中的所有文档。我的来源类似于以下内容:

{
  'hits': [
   {'_index': 'main-index-v2',
    '_type': '_doc',
    '_id': 'ID_xzeta4955029dhs82901',
    '_score': 8.403202,
    '_source': {'id': 'ID_xzeta4955029dhs82901',
        'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
        'non_employee_ids': [],
        'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
        'local_date': '2022/01/10',
        'local': True,
    ...
} 

我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID 的。

def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
    return {
        "query": {
            "multi_match": {
                "query": f"{ids}",
                "fields": fields,
                "operator": "or"
            }
        }
    }

hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')

我想提供字典和字段列表作为参数来更新我的索引。

示例:

{'J1234': 'J2875', 'CH1234': 'J2879'}

词典包含old_ids到new_ids。我想更新每个有旧 ID 的字段。

我的解决方案(到目前为止)

我已经编写了一个无痛脚本来更新 ID,但是它需要为每个字段使用一个 for 循环。该脚本所做的是逐个遍历每个字段。如果列表中的当前项目与我们的参数 'fromId' 匹配,我们将 'toId' 附加到列表,否则将当前项目添加到列表并继续。然后我们将字段设置为新列表。

无痛脚本示例

def result = [];
for (def item: ctx._source.employee_ids) 
    { 
        if (item == params.fromId) {
        result .add(params.toId)
    } 
    else {
        result .add(item)
    }} ctx._source.employee_ids= result; 

def resultF = [];
for (def item: ctx._source.friends_id) 
    { 
        if (item == params.fromId) {
        resultF .add(params.toId)
    } 
    else {
        resultF .add(item)
    }} ctx._source.friends_id = resultF ; 

这可以通过 elasticsearch_dsl 库中的 UpdateByQuery 执行。

更新调用示例。


def partial_update(es, items: dict):
    assert es.ping() is True
    tmp = []
    for from_id, to_id in items.items():
        result = execute_intermediate(from_id, to_id)
        tmp.append(result)
    return tmp

@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
    from elasticsearch_dsl  import UpdateByQuery
    ubq = UpdateByQuery(
        using=auth_es(),
        doc_type='doc', index=settings.ES_WRITE_INDEX,
    )
    ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
    ubq = ubq.params(wait_for_completion=True)
    res = ubq.execute().to_dict()
    return res

创建一个中间函数来对单个 ID 执行更新,用重试装饰器包装。

问题

  1. 这样做需要我一个接一个地循环我的字典来执行更新。

  2. 如果我想增加我们要更新的字段数,我需要添加一个新的for循环。

问题

根据以上内容更新源中所有字段的最佳/最佳解决方案是什么?

有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中更新值?

对此没有 out-of-the-box 解决方案。

对现有无痛脚本的一项改进是就地更改数组,同时使用参数中的映射以及要更新的字段列表。

PUT /test_replace_id/
{
  "mappings": {
    "properties": {
      "employee_ids":{
        "type": "keyword"
      }
    }
  }
}

POST /test_replace_id/_doc/1
{
  "employee_ids": ["old1","old2"],
  "frieds_id": "old1"
}

POST /test_replace_id/_update/1
{
  "script": {
    "source": """
      for (t in params.targets){
        if (ctx._source[t] instanceof List){
          for (int j=0; j<ctx._source[t].length; j++){
            if (params.map.containsKey(ctx._source[t][j])) {
              ctx._source[t][j] = params.map.get(ctx._source[t][j])
            }
          }
        }else{
          if (params.map.containsKey(ctx._source[t])) {
            ctx._source[t] = params.map.get(ctx._source[t])
          }
        }
      }
    """,
    "params":{
      "targets": ["employee_ids","frieds_id"],
      "map": {"old1":"new1"}
    }
  }
}
GET /test_replace_id/_search

这允许更大的灵活性,并且不需要迭代和更新。我们现在可以一次发送整个请求。

@Tomo_M求解答!