使用 elasticsearch 为 python 批量索引/创建文档
Bulk index / create documents with elasticsearch for python
我正在使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用 elasticsearch-py 对它们进行索引。
简化的工作示例(只有一个字段的文档):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
由于每个文档发出一个请求,我尝试通过使用 _bulk
API 发送 1000 个文档的块来加快速度。然而,到目前为止我的尝试都没有成功。
我从文档中了解到,您可以将可迭代对象传递给 bulk()
,所以我尝试了:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
但这会导致
elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')
好的,看来我混淆了两个不同的函数:helpers.bulk()
and Elasticsearch.bulk()
。两者都可以用来实现我打算做的事情,但它们的签名略有不同。
helpers.bulk()
函数接受一个 Elasticsearch()
对象和一个包含文档的可迭代对象作为参数。操作可以指定为 _op_type
,并且可以是 index
、create
、delete
或 update
之一。由于 _op_type
默认为 index
,我们可以忽略它并在这种情况下简单地传递文档列表:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
这很好用。
Elasticsearch.bulk()
函数可以替代使用,但 actions/operations 作为可迭代对象的一部分是强制性的,语法略有不同。这意味着我们需要有一个 dict
来指定操作(在本例中为 "index": {}
)以及每个文档的正文,而不是仅包含文档内容的 dict
.另见 _bulk
documentation:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
这也很好用。
我假设以上两个在内部生成相同的 _bulk
REST API 语句,所以它们最终应该是等价的。
我正在使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用 elasticsearch-py 对它们进行索引。
简化的工作示例(只有一个字段的文档):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
由于每个文档发出一个请求,我尝试通过使用 _bulk
API 发送 1000 个文档的块来加快速度。然而,到目前为止我的尝试都没有成功。
我从文档中了解到,您可以将可迭代对象传递给 bulk()
,所以我尝试了:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
但这会导致
elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')
好的,看来我混淆了两个不同的函数:helpers.bulk()
and Elasticsearch.bulk()
。两者都可以用来实现我打算做的事情,但它们的签名略有不同。
helpers.bulk()
函数接受一个 Elasticsearch()
对象和一个包含文档的可迭代对象作为参数。操作可以指定为 _op_type
,并且可以是 index
、create
、delete
或 update
之一。由于 _op_type
默认为 index
,我们可以忽略它并在这种情况下简单地传递文档列表:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
这很好用。
Elasticsearch.bulk()
函数可以替代使用,但 actions/operations 作为可迭代对象的一部分是强制性的,语法略有不同。这意味着我们需要有一个 dict
来指定操作(在本例中为 "index": {}
)以及每个文档的正文,而不是仅包含文档内容的 dict
.另见 _bulk
documentation:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
这也很好用。
我假设以上两个在内部生成相同的 _bulk
REST API 语句,所以它们最终应该是等价的。