使用 Python 更新所有使用字典输入的 elasticsearch 文档
Update all elasticsearch docs using a dict for input using Python
更新所有文档
背景信息
我有一个用例,我需要更新索引中的所有文档。我的来源类似于以下内容:
{
'hits': [
{'_index': 'main-index-v2',
'_type': '_doc',
'_id': 'ID_xzeta4955029dhs82901',
'_score': 8.403202,
'_source': {'id': 'ID_xzeta4955029dhs82901',
'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
'non_employee_ids': [],
'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
'local_date': '2022/01/10',
'local': True,
...
}
我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID 的。
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
return {
"query": {
"multi_match": {
"query": f"{ids}",
"fields": fields,
"operator": "or"
}
}
}
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
我想提供字典和字段列表作为参数来更新我的索引。
示例:
{'J1234': 'J2875', 'CH1234': 'J2879'}
词典包含old_ids到new_ids。我想更新每个有旧 ID 的字段。
我的解决方案(到目前为止)
我已经编写了一个无痛脚本来更新 ID,但是它需要为每个字段使用一个 for 循环。该脚本所做的是逐个遍历每个字段。如果列表中的当前项目与我们的参数 'fromId' 匹配,我们将 'toId' 附加到列表,否则将当前项目添加到列表并继续。然后我们将字段设置为新列表。
无痛脚本示例
def result = [];
for (def item: ctx._source.employee_ids)
{
if (item == params.fromId) {
result .add(params.toId)
}
else {
result .add(item)
}} ctx._source.employee_ids= result;
def resultF = [];
for (def item: ctx._source.friends_id)
{
if (item == params.fromId) {
resultF .add(params.toId)
}
else {
resultF .add(item)
}} ctx._source.friends_id = resultF ;
这可以通过 elasticsearch_dsl
库中的 UpdateByQuery 执行。
更新调用示例。
def partial_update(es, items: dict):
assert es.ping() is True
tmp = []
for from_id, to_id in items.items():
result = execute_intermediate(from_id, to_id)
tmp.append(result)
return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(
using=auth_es(),
doc_type='doc', index=settings.ES_WRITE_INDEX,
)
ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
ubq = ubq.params(wait_for_completion=True)
res = ubq.execute().to_dict()
return res
创建一个中间函数来对单个 ID 执行更新,用重试装饰器包装。
问题
这样做需要我一个接一个地循环我的字典来执行更新。
如果我想增加我们要更新的字段数,我需要添加一个新的for循环。
问题
根据以上内容更新源中所有字段的最佳/最佳解决方案是什么?
有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中更新值?
对此没有 out-of-the-box 解决方案。
对现有无痛脚本的一项改进是就地更改数组,同时使用参数中的映射以及要更新的字段列表。
PUT /test_replace_id/
{
"mappings": {
"properties": {
"employee_ids":{
"type": "keyword"
}
}
}
}
POST /test_replace_id/_doc/1
{
"employee_ids": ["old1","old2"],
"frieds_id": "old1"
}
POST /test_replace_id/_update/1
{
"script": {
"source": """
for (t in params.targets){
if (ctx._source[t] instanceof List){
for (int j=0; j<ctx._source[t].length; j++){
if (params.map.containsKey(ctx._source[t][j])) {
ctx._source[t][j] = params.map.get(ctx._source[t][j])
}
}
}else{
if (params.map.containsKey(ctx._source[t])) {
ctx._source[t] = params.map.get(ctx._source[t])
}
}
}
""",
"params":{
"targets": ["employee_ids","frieds_id"],
"map": {"old1":"new1"}
}
}
}
GET /test_replace_id/_search
这允许更大的灵活性,并且不需要迭代和更新。我们现在可以一次发送整个请求。
@Tomo_M求解答!
更新所有文档
背景信息
我有一个用例,我需要更新索引中的所有文档。我的来源类似于以下内容:
{
'hits': [
{'_index': 'main-index-v2',
'_type': '_doc',
'_id': 'ID_xzeta4955029dhs82901',
'_score': 8.403202,
'_source': {'id': 'ID_xzeta4955029dhs82901',
'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
'non_employee_ids': [],
'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
'local_date': '2022/01/10',
'local': True,
...
}
我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID 的。
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
return {
"query": {
"multi_match": {
"query": f"{ids}",
"fields": fields,
"operator": "or"
}
}
}
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
我想提供字典和字段列表作为参数来更新我的索引。
示例:
{'J1234': 'J2875', 'CH1234': 'J2879'}
词典包含old_ids到new_ids。我想更新每个有旧 ID 的字段。
我的解决方案(到目前为止)
我已经编写了一个无痛脚本来更新 ID,但是它需要为每个字段使用一个 for 循环。该脚本所做的是逐个遍历每个字段。如果列表中的当前项目与我们的参数 'fromId' 匹配,我们将 'toId' 附加到列表,否则将当前项目添加到列表并继续。然后我们将字段设置为新列表。
无痛脚本示例
def result = [];
for (def item: ctx._source.employee_ids)
{
if (item == params.fromId) {
result .add(params.toId)
}
else {
result .add(item)
}} ctx._source.employee_ids= result;
def resultF = [];
for (def item: ctx._source.friends_id)
{
if (item == params.fromId) {
resultF .add(params.toId)
}
else {
resultF .add(item)
}} ctx._source.friends_id = resultF ;
这可以通过 elasticsearch_dsl
库中的 UpdateByQuery 执行。
更新调用示例。
def partial_update(es, items: dict):
assert es.ping() is True
tmp = []
for from_id, to_id in items.items():
result = execute_intermediate(from_id, to_id)
tmp.append(result)
return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(
using=auth_es(),
doc_type='doc', index=settings.ES_WRITE_INDEX,
)
ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
ubq = ubq.params(wait_for_completion=True)
res = ubq.execute().to_dict()
return res
创建一个中间函数来对单个 ID 执行更新,用重试装饰器包装。
问题
这样做需要我一个接一个地循环我的字典来执行更新。
如果我想增加我们要更新的字段数,我需要添加一个新的for循环。
问题
根据以上内容更新源中所有字段的最佳/最佳解决方案是什么?
有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中更新值?
对此没有 out-of-the-box 解决方案。
对现有无痛脚本的一项改进是就地更改数组,同时使用参数中的映射以及要更新的字段列表。
PUT /test_replace_id/
{
"mappings": {
"properties": {
"employee_ids":{
"type": "keyword"
}
}
}
}
POST /test_replace_id/_doc/1
{
"employee_ids": ["old1","old2"],
"frieds_id": "old1"
}
POST /test_replace_id/_update/1
{
"script": {
"source": """
for (t in params.targets){
if (ctx._source[t] instanceof List){
for (int j=0; j<ctx._source[t].length; j++){
if (params.map.containsKey(ctx._source[t][j])) {
ctx._source[t][j] = params.map.get(ctx._source[t][j])
}
}
}else{
if (params.map.containsKey(ctx._source[t])) {
ctx._source[t] = params.map.get(ctx._source[t])
}
}
}
""",
"params":{
"targets": ["employee_ids","frieds_id"],
"map": {"old1":"new1"}
}
}
}
GET /test_replace_id/_search
这允许更大的灵活性,并且不需要迭代和更新。我们现在可以一次发送整个请求。
@Tomo_M求解答!