Elasticsearch 在节点 js 中使用滚动显示所有结果

Elasticsearch show all results using scroll in node js

我基本上是想显示一个索引类型的所有记录。现在,如果您在查询中使用 match_all(),则 elasticsearch 默认显示 10 个结果。可以使用滚动显示所有结果。我正在尝试实现滚动 api,但无法使其工作。它只显示 10 个结果,我的代码:

module.exports.searchAll = function (searchData, callback) {

client.search({
    index: 'test',
    type: 'records',
    scroll: '10s',
    //search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
    body: {
        query: {
            "match_all": {}
        }
    }
}, function (err, resp) {
    client.scroll({
        scrollId: resp._scroll_id,
        scroll: '10s'
    }, callback(resp.hits.hits));
});
}

有人能帮忙吗?

您需要重复调​​用client.scroll,直到没有更多的记录返回。有一个不错的example in the elasticsearch documentation。我在下面复制了他们的示例代码,稍作修改以匹配您的问题

var allRecords = [];

// first we do a search, and specify a scroll timeout
client.search({
  index: 'test',
  type: 'records',
  scroll: '10s',
  body: {
     query: {
         "match_all": {}
     }
  }
}, function getMoreUntilDone(error, response) {
  // collect all the records
  response.hits.hits.forEach(function (hit) {
    allRecords.push(hit);
  });

  if (response.hits.total !== allRecords.length) {
    // now we can call scroll over and over
    client.scroll({
      scrollId: response._scroll_id,
      scroll: '10s'
    }, getMoreUntilDone);
  } else {
    console.log('all done', allRecords);
  }
});

感谢@Ceilingfish。这是上面使用 await

的修改后的 ES6 版本
let allRecords = [];

// first we do a search, and specify a scroll timeout
var { _scroll_id, hits } = await esclient.search({
    index: 'test',
    type: 'records',
    scroll: '10s',
    body: {
        query: {
            "match_all": {}
        },
        _source: false
    }
})

while(hits && hits.hits.length) {
    // Append all new hits
    allRecords.push(...hits.hits)

    console.log(`${allRecords.length} of ${hits.total}`)

    var { _scroll_id, hits } = await esclient.scroll({
        scrollId: _scroll_id,
        scroll: '10s'
    })
}

console.log(`Complete: ${allRecords.length} records retrieved`)

这就是我使用 Promises 的方式

var EsHelper = function() {
    this.esUrl = esUrl;
    this.indexName = "myIndex";
    this.type = "myIndexType";
    this.elasticClient = new elasticsearch.Client({
        host: esUrl
    });
};

EsHelper.prototype.scrollData = function(response, allHits) {
    return new Promise((resolve, reject) => {
        response.hits.hits.forEach((hit) => allHits.push(hit));
        if (response.hits.total !== allHits.length) {
            this.elasticClient.scroll({
                scroll_id: response._scroll_id,
                scroll: '10s',
            }).then((response) => {
                resolve(this.scrollData(response, allHits));
            }).catch((error) => reject(error));
        } else {
            resolve(allHits);
        }
    });
};

EsHelper.prototype.runSearchWithScroll = function(query) {
    var allHits = [];
    return this.elasticClient.search({
            index: this.indexName,
            type: this.type,
            scroll: '10s',
            body: query
        })
        .then((response) => (this.scrollData(response, allHits)))
        .then((result) => {
            return result;
        });
};

有更好的方法吗?

当 elastic 有超过 10000 个结果时,NodeJS 失败。这就是我使用滚动的方式。

async function getResultsFromElastic() {
    let responseAll = {};
    responseAll["hits"] = {};
    responseAll.hits.hits = [];
    const responseQueue = [];

    searchQuery = {
                    index: 'test',
                    type: 'records',
                    body: { 
                            query: {
                               "match_all": {}
                            }
                    }
    }
    searchQuery.scroll='10s';
    searchQuery.size=10000;

    responseQueue.push(await esclient.search(searchQuery));

    while (responseQueue.length) {
      const response = responseQueue.shift();

      responseAll.hits.hits = responseAll.hits.hits.concat(response.hits.hits);

      if (response.hits.total == responseAll.hits.hits.length) {
        break;
      }

      // get the next response if there are more to fetch
      responseQueue.push(
        await esclient.scroll({
          scrollId: response._scroll_id,
          scroll: '30s'
        })
      );
    }

    return responseAll;
}

这里有很多 well-written 答案可以解决问题。但是如果有人正在寻找开箱即用的解决方案,他们可以到这里来使用这个包 - https://github.com/alcacoop/elasticsearch-scroll-stream

用法非常简单,而且效果很好。下面是我从他们的官方文档中摘录的一个例子。

const elasticsearch = require('elasticsearch');
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream');

const client = new elasticsearch.Client();

const es_stream = new ElasticsearchScrollStream(client, {
  index: 'your-index',
  type: 'your-type',
  scroll: '10s',
  size: '50',
  _source: ['name'],
  q: 'name:*'
});

es_stream.pipe(process.stdout);

es_stream.on('data', function(data) {
  // Process your results here
});

es_stream.on('end', function() {
  console.log("End");
});

使用 Node.js 客户端使用 async/await.

滚动从弹性搜索中获取所有数据的查询
const elasticsearch = require('@elastic/elasticsearch');
async function esconnection(){
  let es =  await new elasticsearch.Client({
    node: "http://192.168.1.1:7200"
  });
  return es;
}
async function getAllUserList(){
    try{
        let userArray = [];
        let query ={
            "query":{
                "match_all": {}
            }
        }   
        let es = await esconnection();
        let {body}=  await es.search({
                    index: 'esIndex',
                    type :"esIndexType",           
                    scroll :'2m', //# Specify how long a consistent view of the index should be maintained for scrolled search
                    size: 100,    //  # Number of hits to return (default: 10)
                    body: query
                    });
        let sid = body['_scroll_id']
        let scroll_size = body['hits']['total']
        let dataLength = body['hits']['hits'].length
        while (scroll_size > 0){
        for(let i=0; i<dataLength;i++){
            if(body['hits']['hits'][i])
            {
            let userData = (body['hits']['hits'][i]['_source'])
            userArray.push(userData)
            }
        }
        sid = body['_scroll_id']
        body = await es.scroll({
            scrollId: sid,
            scroll: '10s'
        })
        body=body.body
        scroll_size = (body['hits']['hits']).length;
        }
        es.close();
        return userArray;
    }  catch(error){
        console.log("Code not working properly: ",`${error}`)
    }
}

我想我们也可以使用 yield,例如

/**
 * 
 * @param {object} elasticClient 
 * @param {{index: string, scroll: string, size: number: body: object}} searchQuery 
 */
async function* getRecords(elasticClient, searchQuery) {
  const response = await elasticClient.search(searchQuery);  
  const responseQueue = [];
  let counter = 0;
  
  responseQueue.push(response);
  while(responseQueue.length) {
    const { body } = responseQueue.shift();

    counter += body.hits.hits.length;
    for(const hit of body.hits.hits) {
      yield hit;
    }

    if (body.hits.total.value === counter) {
      break;
    }
    
    responseQueue.push(
      await elasticClient.scroll({
        scrollId: body._scroll_id,
        scroll: searchQuery.scroll
      })
    )
  }
}

然后是你的查询

const body = { query: {"match_all": {}} } };
for await (const record of getRecords(elasticClient, {index: 'test', scroll: '30s', size: 100, body})) {
    console.log(record);
}