如何为 Python Elasticsearch mSearch 创建请求正文

How to create request body for Python Elasticsearch mSearch

我正尝试在 Elasticsearch Python 客户端上 运行 a multi search request。我可以 运行 正确地进行单数搜索,但不知道如何格式化 msearch 请求。根据文档,请求的正文需要格式化为:

The request definitions (metadata-search request definition pairs), as either a newline separated string, or a sequence of dicts to serialize (one per row).

创建此请求正文的最佳方法是什么?我一直在寻找示例,但似乎找不到任何示例。

知道了!这是我为其他人所做的...

query_list = ""
es = ElasticSearch("myurl")
for obj in my_list:
    query = constructQuery(name)
    query_count += 1
    query_list += json.dumps({})
    query_list += json.dumps(query)
    if query_count <= 19:
        query_list += "\n"
    if query_count == 20:
        es.msearch(index = "m_index", body = query_list)

我开始搞砸了,不得不添加两次索引。即使在使用 Python 客户端时,您仍然必须包含原始文档中描述的索引部分。现在可以使用了!

如果您遵循 official doc 的演示(甚至认为它是针对 BulkAPI 的),您会发现如何使用 Elasticsearch 客户端在 python 中构建您的请求:

这里是换行分隔字符串的方式:

def msearch():
    es = get_es_instance()

    search_arr = []
    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
    # req_body
    search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})

    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
    # req_body
    search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})

    request = ''
    for each in search_arr:
        request += '%s \n' %json.dumps(each)

    # as you can see, you just need to feed the <body> parameter,
    # and don't need to specify the <index> and <doc_type> as usual 
    resp = es.msearch(body = request)

如您所见,最终请求是由几个 req_unit 构建的。 每个 req_unit 构造如下所示:

request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n

sequence of dicts to serialize 方法和前面的几乎一样,只是不需要将其转换为字符串:

def msearch():
    es = get_es_instance()

    request = []

    req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
    req_body = {
        'query': {'term': {'text' : 'bag'}}, 
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
    req_body = {
        'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    resp = es.msearch(body = request)

Here is the structure it returns. Read more about msearch.

如果您正在使用 elasticsearch-dsl, you can use the class MultiSearch

文档中的示例:

from elasticsearch_dsl import MultiSearch, Search

ms = MultiSearch(index='blogs')

ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))

responses = ms.execute()

for response in responses:
    print("Results for query %r." % response.search.query)
    for hit in response:
        print(hit.title)

这是我想出的。我使用相同的文档类型和索引,因此我将代码优化为 运行 具有相同 header:

的多个查询
from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json

RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5

def msearch(es_conn, queries, index, doc_type, retries=0):
    """
    Es multi-search query
    :param queries: list of dict, es queries
    :param index: str, index to query against
    :param doc_type: str, defined doc type i.e. event
    :param retries: int, current retry attempt
    :return: list, found docs
    """
    search_header = json.dumps({'index': index, 'type': doc_type})
    request = ''
    for q in queries:
        # request head, body pairs
        request += '{}\n{}\n'.format(search_header, json.dumps(q))
    try:
        resp = es_conn.msearch(body=request, index=index)
        found = [r['hits']['hits'] for r in resp['responses']]
    except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
            es_exceptions.TransportError):  # pragma: no cover
        logging.warning("msearch connection failed, retrying...")  # Retry on timeout
        if retries > RETRY_ATTEMPTS:  # pragma: no cover
            raise
        time.sleep(RECONNECT_SLEEP_SECS)
        found = msearch(queries=queries, index=index, retries=retries + 1)
    except Exception as e:  # pragma: no cover
        logging.critical("msearch error {} on query {}".format(e, queries))
        raise
    return found

es_conn = Elasticsearch()
queries = []
queries.append(
    {"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
    {"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
    {"track_scores": True, "min_score": 9.0, "query":
        {"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')

如果您想对同一索引和文档类型执行多个查询,这可能就是你们中的一些人正在寻找的东西。