使用 elasticsearch-py 库的 ElasticSearch ConnectionPool
ElasticSearch ConnectionPool using elasticsearch-py library
我是 ElasticSearch 的新手,正在尝试使用来自 ElasticSearch ConnectionPool 的并发连接向 ElasticSearch 中的索引添加条目 [通过 Transport class
]。
这是我的代码:
import elasticsearch
from elasticsearch.transport import Transport
def init_connection():
transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False)
transport.add_connection(host=SERVER_URL+SERVER_PORT)
return transport
def add_entries_to_es(id, name):
transport = init_connection()
doc = {
'name': name,
'postDate': datetime.datetime.now(),
'valid': "true",
'suggest': {
"input": name,
'output': name,
'payload': {'domain_id': id}
}
}
conn = transport.getConnection()
es = elasticsearch.Elasticsearch(connection_class=conn)
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc)
...
我得到以下错误:
File "/my_project/elastichelper.py", line 23, in init_connection
transport.add_connection(host=SERVER_URL+SERVER_PORT)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection
self.set_connections(self.hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections
connections = map(_create_connection, hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection
kwargs.update(host)
ValueError: dictionary update sequence element #0 has length 1; 2 is required
我不确定 Transport class
是否是在 ElasticSearch 中实例化 ConnectionPool
的正确方法。但是,我从文档中读到 Transport class 处理各个连接的实例化以及创建连接池来保存它们。
我没有找到正确的方法来实例化 ConnectionPool
并有效地使用池中的连接。阅读和谷歌搜索对我没有帮助。
我也知道 helpers.bulk() API,但我对使用它感到困惑,因为在向索引添加条目的同时,我也删除了无效条目。
我发现只需使用 ElasticSearch
classes 实例和适当的 timeout
值集 [对我来说, timeout=30
足够好] index
方法有效。像这样:
doc = {
'name': name,
'postDate': datetime.datetime.now(),
'valid': "true",
'suggest': {
"input": name,
'output': name,
'payload': {'domain_id': id}
}
}
es = elasticsearch.Elasticsearch()
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30)
我最初遇到 timeout
简单 ElasticSearch
class 实例的问题,已通过上述更改修复。
我根本不需要明确地使用 Transport
或 Connection
class 实例。
我是 ElasticSearch 的新手,正在尝试使用来自 ElasticSearch ConnectionPool 的并发连接向 ElasticSearch 中的索引添加条目 [通过 Transport class
]。
这是我的代码:
import elasticsearch
from elasticsearch.transport import Transport
def init_connection():
transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False)
transport.add_connection(host=SERVER_URL+SERVER_PORT)
return transport
def add_entries_to_es(id, name):
transport = init_connection()
doc = {
'name': name,
'postDate': datetime.datetime.now(),
'valid': "true",
'suggest': {
"input": name,
'output': name,
'payload': {'domain_id': id}
}
}
conn = transport.getConnection()
es = elasticsearch.Elasticsearch(connection_class=conn)
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc)
...
我得到以下错误:
File "/my_project/elastichelper.py", line 23, in init_connection
transport.add_connection(host=SERVER_URL+SERVER_PORT)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection
self.set_connections(self.hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections
connections = map(_create_connection, hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection
kwargs.update(host)
ValueError: dictionary update sequence element #0 has length 1; 2 is required
我不确定 Transport class
是否是在 ElasticSearch 中实例化 ConnectionPool
的正确方法。但是,我从文档中读到 Transport class 处理各个连接的实例化以及创建连接池来保存它们。
我没有找到正确的方法来实例化 ConnectionPool
并有效地使用池中的连接。阅读和谷歌搜索对我没有帮助。
我也知道 helpers.bulk() API,但我对使用它感到困惑,因为在向索引添加条目的同时,我也删除了无效条目。
我发现只需使用 ElasticSearch
classes 实例和适当的 timeout
值集 [对我来说, timeout=30
足够好] index
方法有效。像这样:
doc = {
'name': name,
'postDate': datetime.datetime.now(),
'valid': "true",
'suggest': {
"input": name,
'output': name,
'payload': {'domain_id': id}
}
}
es = elasticsearch.Elasticsearch()
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30)
我最初遇到 timeout
简单 ElasticSearch
class 实例的问题,已通过上述更改修复。
我根本不需要明确地使用 Transport
或 Connection
class 实例。