如何为 Python Elasticsearch mSearch 创建请求正文
How to create request body for Python Elasticsearch mSearch
我正尝试在 Elasticsearch Python 客户端上 运行 a multi search request。我可以 运行 正确地进行单数搜索,但不知道如何格式化 msearch 请求。根据文档,请求的正文需要格式化为:
The request definitions (metadata-search request definition pairs), as
either a newline separated string, or a sequence of dicts to serialize
(one per row).
创建此请求正文的最佳方法是什么?我一直在寻找示例,但似乎找不到任何示例。
知道了!这是我为其他人所做的...
query_list = ""
es = ElasticSearch("myurl")
for obj in my_list:
query = constructQuery(name)
query_count += 1
query_list += json.dumps({})
query_list += json.dumps(query)
if query_count <= 19:
query_list += "\n"
if query_count == 20:
es.msearch(index = "m_index", body = query_list)
我开始搞砸了,不得不添加两次索引。即使在使用 Python 客户端时,您仍然必须包含原始文档中描述的索引部分。现在可以使用了!
如果您遵循 official doc 的演示(甚至认为它是针对 BulkAPI 的),您会发现如何使用 Elasticsearch
客户端在 python 中构建您的请求:
这里是换行分隔字符串的方式:
def msearch():
es = get_es_instance()
search_arr = []
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
# req_body
search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
# req_body
search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})
request = ''
for each in search_arr:
request += '%s \n' %json.dumps(each)
# as you can see, you just need to feed the <body> parameter,
# and don't need to specify the <index> and <doc_type> as usual
resp = es.msearch(body = request)
如您所见,最终请求是由几个 req_unit 构建的。
每个 req_unit 构造如下所示:
request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n
sequence of dicts to serialize 方法和前面的几乎一样,只是不需要将其转换为字符串:
def msearch():
es = get_es_instance()
request = []
req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
req_body = {
'query': {'term': {'text' : 'bag'}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
req_body = {
'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
resp = es.msearch(body = request)
如果您正在使用 elasticsearch-dsl, you can use the class MultiSearch。
文档中的示例:
from elasticsearch_dsl import MultiSearch, Search
ms = MultiSearch(index='blogs')
ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))
responses = ms.execute()
for response in responses:
print("Results for query %r." % response.search.query)
for hit in response:
print(hit.title)
这是我想出的。我使用相同的文档类型和索引,因此我将代码优化为 运行 具有相同 header:
的多个查询
from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json
RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5
def msearch(es_conn, queries, index, doc_type, retries=0):
"""
Es multi-search query
:param queries: list of dict, es queries
:param index: str, index to query against
:param doc_type: str, defined doc type i.e. event
:param retries: int, current retry attempt
:return: list, found docs
"""
search_header = json.dumps({'index': index, 'type': doc_type})
request = ''
for q in queries:
# request head, body pairs
request += '{}\n{}\n'.format(search_header, json.dumps(q))
try:
resp = es_conn.msearch(body=request, index=index)
found = [r['hits']['hits'] for r in resp['responses']]
except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
es_exceptions.TransportError): # pragma: no cover
logging.warning("msearch connection failed, retrying...") # Retry on timeout
if retries > RETRY_ATTEMPTS: # pragma: no cover
raise
time.sleep(RECONNECT_SLEEP_SECS)
found = msearch(queries=queries, index=index, retries=retries + 1)
except Exception as e: # pragma: no cover
logging.critical("msearch error {} on query {}".format(e, queries))
raise
return found
es_conn = Elasticsearch()
queries = []
queries.append(
{"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
{"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
{"track_scores": True, "min_score": 9.0, "query":
{"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')
如果您想对同一索引和文档类型执行多个查询,这可能就是你们中的一些人正在寻找的东西。
我正尝试在 Elasticsearch Python 客户端上 运行 a multi search request。我可以 运行 正确地进行单数搜索,但不知道如何格式化 msearch 请求。根据文档,请求的正文需要格式化为:
The request definitions (metadata-search request definition pairs), as either a newline separated string, or a sequence of dicts to serialize (one per row).
创建此请求正文的最佳方法是什么?我一直在寻找示例,但似乎找不到任何示例。
知道了!这是我为其他人所做的...
query_list = ""
es = ElasticSearch("myurl")
for obj in my_list:
query = constructQuery(name)
query_count += 1
query_list += json.dumps({})
query_list += json.dumps(query)
if query_count <= 19:
query_list += "\n"
if query_count == 20:
es.msearch(index = "m_index", body = query_list)
我开始搞砸了,不得不添加两次索引。即使在使用 Python 客户端时,您仍然必须包含原始文档中描述的索引部分。现在可以使用了!
如果您遵循 official doc 的演示(甚至认为它是针对 BulkAPI 的),您会发现如何使用 Elasticsearch
客户端在 python 中构建您的请求:
这里是换行分隔字符串的方式:
def msearch():
es = get_es_instance()
search_arr = []
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
# req_body
search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})
# req_head
search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
# req_body
search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})
request = ''
for each in search_arr:
request += '%s \n' %json.dumps(each)
# as you can see, you just need to feed the <body> parameter,
# and don't need to specify the <index> and <doc_type> as usual
resp = es.msearch(body = request)
如您所见,最终请求是由几个 req_unit 构建的。 每个 req_unit 构造如下所示:
request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n
sequence of dicts to serialize 方法和前面的几乎一样,只是不需要将其转换为字符串:
def msearch():
es = get_es_instance()
request = []
req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
req_body = {
'query': {'term': {'text' : 'bag'}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
req_body = {
'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
'from' : 0, 'size': 2 }
request.extend([req_head, req_body])
resp = es.msearch(body = request)
如果您正在使用 elasticsearch-dsl, you can use the class MultiSearch。
文档中的示例:
from elasticsearch_dsl import MultiSearch, Search
ms = MultiSearch(index='blogs')
ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))
responses = ms.execute()
for response in responses:
print("Results for query %r." % response.search.query)
for hit in response:
print(hit.title)
这是我想出的。我使用相同的文档类型和索引,因此我将代码优化为 运行 具有相同 header:
的多个查询from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json
RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5
def msearch(es_conn, queries, index, doc_type, retries=0):
"""
Es multi-search query
:param queries: list of dict, es queries
:param index: str, index to query against
:param doc_type: str, defined doc type i.e. event
:param retries: int, current retry attempt
:return: list, found docs
"""
search_header = json.dumps({'index': index, 'type': doc_type})
request = ''
for q in queries:
# request head, body pairs
request += '{}\n{}\n'.format(search_header, json.dumps(q))
try:
resp = es_conn.msearch(body=request, index=index)
found = [r['hits']['hits'] for r in resp['responses']]
except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
es_exceptions.TransportError): # pragma: no cover
logging.warning("msearch connection failed, retrying...") # Retry on timeout
if retries > RETRY_ATTEMPTS: # pragma: no cover
raise
time.sleep(RECONNECT_SLEEP_SECS)
found = msearch(queries=queries, index=index, retries=retries + 1)
except Exception as e: # pragma: no cover
logging.critical("msearch error {} on query {}".format(e, queries))
raise
return found
es_conn = Elasticsearch()
queries = []
queries.append(
{"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
{"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
{"track_scores": True, "min_score": 9.0, "query":
{"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')
如果您想对同一索引和文档类型执行多个查询,这可能就是你们中的一些人正在寻找的东西。