如何正确检查滚动结束?

how to correctly check for scroll end?

我正在使用 scroll method 批量获取大量事件。我不知道如何正确停止滚动。

我现在正在做的(有效)是检查 TransportError 表示滚动尝试失败:

scanResp= es.search(
    index="nessus_all",
    doc_type="marker",
    body={"query": {"match_all": {}}},
    search_type="scan",
    scroll="10m"
)
scrollId= scanResp['_scroll_id']
while True:
    try:
        response = es.scroll(scroll_id=scrollId, scroll= "10m")
        # process results
    except Exception as e:
        log.debug("ended scroll: {e}".format(e=e))
        break
# we are done with the search

这会在 /var/log/elasticsearch/security.log 中产生一个错误:

[2015-02-16 09:36:07,110][DEBUG][action.search.type       ] [eu4] [2791] Failed to execute query phase
org.elasticsearch.transport.RemoteTransportException: [eu5][inet[/10.81.147.186:9300]][indices:data/read/search[phase/scan/scroll]]
Caused by: org.elasticsearch.search.SearchContextMissingException: No search context found for id [2791]
        at org.elasticsearch.search.SearchService.findContext(SearchService.java:502)
        at org.elasticsearch.search.SearchService.executeScan(SearchService.java:236)
        at org.elasticsearch.search.action.SearchServiceTransportAction$SearchScanScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:939)
        at org.elasticsearch.search.action.SearchServiceTransportAction$SearchScanScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:930)
        at org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.run(MessageChannelHandler.java:275)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

而且一般来说似乎不是正确的方法?

在仔细查看 .scroll() 之后,我想出了

scanResp= es.search(
    index="nessus_all",
    doc_type="marker",
    body={"query": {"match_all": {}}},
    search_type="scan",
    scroll="10m"
)
scrollId= scanResp['_scroll_id']
totalhits = scanResp['hits']['total']

while totalhits > 0:
    response = es.scroll(scroll_id=scrollId, scroll= "10m")
    # process results
    totalhits -= len(response['hits']['hits'])

# we are done with the search

根据 Elasticsearch's Scroll documentation(从 5.1 版开始):

Each call to the scroll API returns the next batch of results until there are no more results left to return, ie the hits array is empty.

所以,我认为最好的方法是检查 len(response['hits']['hits'])

更具体的例子:

response = es.search(
    index='index_name',
    body=<your query here>,
    scroll='10m'
)
scroll_id = response['_scroll_id']

while len(response['hits']['hits']):
    response = es.scroll(scroll_id=scroll_id, scroll='10m')
    # process results