elasticsearch 使用 python 创建或更新文档

elasticsearch create or update document using python

我正在使用 elasticsearch-py 进行 elasticsearch 操作。

我正在尝试 elasticsearch.helpers.bulk 创建或更新多条记录。

from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()

data = [
    {
        "_index": "customer",
        "_type": "external",
        "_op_type": "create",
        "_id": 3,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_op_type": "create",
        "_id": 4,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_op_type": "create",
        "_id": 5,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_op_type": "create",
        "_id": 6,
        "doc" : {"name": "test"}
    },
]


print helpers.bulk(es, data)

有什么方法可以执行这个操作吗?

现在我们只能给出 _op_type 作为 createupdate。如果我们给出 update 并且记录不存在,那么它会引发错误。

Traceback (most recent call last):
  File "/tmp/test.py", line 37, in <module>
    print helpers.bulk(es, data)
  File "/local/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 182, in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
  File "/local/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 155, in streaming_bulk
    raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
elasticsearch.helpers.BulkIndexError: ('4 document(s) failed to index.', [{u'update': {u'status': 404, u'_type': u'external', u'_id': u'3', u'error': u'DocumentMissingException[[customer][-1] [external][3]: document missing]', u'_index': u'customer'}}, {u'update': {u'status': 404, u'_type': u'external', u'_id': u'4', u'error': u'DocumentMissingException[[customer][-1] [external][4]: document missing]', u'_index': u'customer'}}, {u'update': {u'status': 404, u'_type': u'external', u'_id': u'5', u'error': u'DocumentMissingException[[customer][-1] [external][5]: document missing]', u'_index': u'customer'}}, {u'update': {u'status': 404, u'_type': u'external', u'_id': u'6', u'error': u'DocumentMissingException[[customer][-1] [external][6]: document missing]', u'_index': u'customer'}}])

根据 _bulk endpoint 文档,您可以而且应该为此使用 index 操作,前提是您的文档始终具有相同的标识符。

create 在第一次创建文档时很有用,update 更适合进行部分 and/or 脚本更新。

您也可以完全不指定任何 _op_type,默认情况下会采用 index

我尝试了@Val 建议的解决方案,效果很好。

from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()

data = [
    {
        "_index": "customer",
        "_type": "external",
        "_id": 3,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_id": 4,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_id": 5,
        "doc" : {"name": "test"}
    },
    {
        "_index": "customer",
        "_type": "external",
        "_id": 6,
        "doc" : {"name": "test"}
    },
]


print helpers.bulk(es, data)