Elasticsearch 在节点 js 中使用滚动显示所有结果
Elasticsearch show all results using scroll in node js
我基本上是想显示一个索引类型的所有记录。现在,如果您在查询中使用 match_all(),则 elasticsearch 默认显示 10 个结果。可以使用滚动显示所有结果。我正在尝试实现滚动 api,但无法使其工作。它只显示 10 个结果,我的代码:
module.exports.searchAll = function (searchData, callback) {
client.search({
index: 'test',
type: 'records',
scroll: '10s',
//search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
body: {
query: {
"match_all": {}
}
}
}, function (err, resp) {
client.scroll({
scrollId: resp._scroll_id,
scroll: '10s'
}, callback(resp.hits.hits));
});
}
有人能帮忙吗?
您需要重复调用client.scroll
,直到没有更多的记录返回。有一个不错的example in the elasticsearch documentation。我在下面复制了他们的示例代码,稍作修改以匹配您的问题
var allRecords = [];
// first we do a search, and specify a scroll timeout
client.search({
index: 'test',
type: 'records',
scroll: '10s',
body: {
query: {
"match_all": {}
}
}
}, function getMoreUntilDone(error, response) {
// collect all the records
response.hits.hits.forEach(function (hit) {
allRecords.push(hit);
});
if (response.hits.total !== allRecords.length) {
// now we can call scroll over and over
client.scroll({
scrollId: response._scroll_id,
scroll: '10s'
}, getMoreUntilDone);
} else {
console.log('all done', allRecords);
}
});
感谢@Ceilingfish。这是上面使用 await
的修改后的 ES6 版本
let allRecords = [];
// first we do a search, and specify a scroll timeout
var { _scroll_id, hits } = await esclient.search({
index: 'test',
type: 'records',
scroll: '10s',
body: {
query: {
"match_all": {}
},
_source: false
}
})
while(hits && hits.hits.length) {
// Append all new hits
allRecords.push(...hits.hits)
console.log(`${allRecords.length} of ${hits.total}`)
var { _scroll_id, hits } = await esclient.scroll({
scrollId: _scroll_id,
scroll: '10s'
})
}
console.log(`Complete: ${allRecords.length} records retrieved`)
这就是我使用 Promises 的方式
var EsHelper = function() {
this.esUrl = esUrl;
this.indexName = "myIndex";
this.type = "myIndexType";
this.elasticClient = new elasticsearch.Client({
host: esUrl
});
};
EsHelper.prototype.scrollData = function(response, allHits) {
return new Promise((resolve, reject) => {
response.hits.hits.forEach((hit) => allHits.push(hit));
if (response.hits.total !== allHits.length) {
this.elasticClient.scroll({
scroll_id: response._scroll_id,
scroll: '10s',
}).then((response) => {
resolve(this.scrollData(response, allHits));
}).catch((error) => reject(error));
} else {
resolve(allHits);
}
});
};
EsHelper.prototype.runSearchWithScroll = function(query) {
var allHits = [];
return this.elasticClient.search({
index: this.indexName,
type: this.type,
scroll: '10s',
body: query
})
.then((response) => (this.scrollData(response, allHits)))
.then((result) => {
return result;
});
};
有更好的方法吗?
当 elastic 有超过 10000 个结果时,NodeJS 失败。这就是我使用滚动的方式。
async function getResultsFromElastic() {
let responseAll = {};
responseAll["hits"] = {};
responseAll.hits.hits = [];
const responseQueue = [];
searchQuery = {
index: 'test',
type: 'records',
body: {
query: {
"match_all": {}
}
}
}
searchQuery.scroll='10s';
searchQuery.size=10000;
responseQueue.push(await esclient.search(searchQuery));
while (responseQueue.length) {
const response = responseQueue.shift();
responseAll.hits.hits = responseAll.hits.hits.concat(response.hits.hits);
if (response.hits.total == responseAll.hits.hits.length) {
break;
}
// get the next response if there are more to fetch
responseQueue.push(
await esclient.scroll({
scrollId: response._scroll_id,
scroll: '30s'
})
);
}
return responseAll;
}
这里有很多 well-written 答案可以解决问题。但是如果有人正在寻找开箱即用的解决方案,他们可以到这里来使用这个包 - https://github.com/alcacoop/elasticsearch-scroll-stream
用法非常简单,而且效果很好。下面是我从他们的官方文档中摘录的一个例子。
const elasticsearch = require('elasticsearch');
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const client = new elasticsearch.Client();
const es_stream = new ElasticsearchScrollStream(client, {
index: 'your-index',
type: 'your-type',
scroll: '10s',
size: '50',
_source: ['name'],
q: 'name:*'
});
es_stream.pipe(process.stdout);
es_stream.on('data', function(data) {
// Process your results here
});
es_stream.on('end', function() {
console.log("End");
});
使用 Node.js 客户端使用 async/await.
滚动从弹性搜索中获取所有数据的查询
const elasticsearch = require('@elastic/elasticsearch');
async function esconnection(){
let es = await new elasticsearch.Client({
node: "http://192.168.1.1:7200"
});
return es;
}
async function getAllUserList(){
try{
let userArray = [];
let query ={
"query":{
"match_all": {}
}
}
let es = await esconnection();
let {body}= await es.search({
index: 'esIndex',
type :"esIndexType",
scroll :'2m', //# Specify how long a consistent view of the index should be maintained for scrolled search
size: 100, // # Number of hits to return (default: 10)
body: query
});
let sid = body['_scroll_id']
let scroll_size = body['hits']['total']
let dataLength = body['hits']['hits'].length
while (scroll_size > 0){
for(let i=0; i<dataLength;i++){
if(body['hits']['hits'][i])
{
let userData = (body['hits']['hits'][i]['_source'])
userArray.push(userData)
}
}
sid = body['_scroll_id']
body = await es.scroll({
scrollId: sid,
scroll: '10s'
})
body=body.body
scroll_size = (body['hits']['hits']).length;
}
es.close();
return userArray;
} catch(error){
console.log("Code not working properly: ",`${error}`)
}
}
我想我们也可以使用 yield,例如
/**
*
* @param {object} elasticClient
* @param {{index: string, scroll: string, size: number: body: object}} searchQuery
*/
async function* getRecords(elasticClient, searchQuery) {
const response = await elasticClient.search(searchQuery);
const responseQueue = [];
let counter = 0;
responseQueue.push(response);
while(responseQueue.length) {
const { body } = responseQueue.shift();
counter += body.hits.hits.length;
for(const hit of body.hits.hits) {
yield hit;
}
if (body.hits.total.value === counter) {
break;
}
responseQueue.push(
await elasticClient.scroll({
scrollId: body._scroll_id,
scroll: searchQuery.scroll
})
)
}
}
然后是你的查询
const body = { query: {"match_all": {}} } };
for await (const record of getRecords(elasticClient, {index: 'test', scroll: '30s', size: 100, body})) {
console.log(record);
}
我基本上是想显示一个索引类型的所有记录。现在,如果您在查询中使用 match_all(),则 elasticsearch 默认显示 10 个结果。可以使用滚动显示所有结果。我正在尝试实现滚动 api,但无法使其工作。它只显示 10 个结果,我的代码:
module.exports.searchAll = function (searchData, callback) {
client.search({
index: 'test',
type: 'records',
scroll: '10s',
//search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
body: {
query: {
"match_all": {}
}
}
}, function (err, resp) {
client.scroll({
scrollId: resp._scroll_id,
scroll: '10s'
}, callback(resp.hits.hits));
});
}
有人能帮忙吗?
您需要重复调用client.scroll
,直到没有更多的记录返回。有一个不错的example in the elasticsearch documentation。我在下面复制了他们的示例代码,稍作修改以匹配您的问题
var allRecords = [];
// first we do a search, and specify a scroll timeout
client.search({
index: 'test',
type: 'records',
scroll: '10s',
body: {
query: {
"match_all": {}
}
}
}, function getMoreUntilDone(error, response) {
// collect all the records
response.hits.hits.forEach(function (hit) {
allRecords.push(hit);
});
if (response.hits.total !== allRecords.length) {
// now we can call scroll over and over
client.scroll({
scrollId: response._scroll_id,
scroll: '10s'
}, getMoreUntilDone);
} else {
console.log('all done', allRecords);
}
});
感谢@Ceilingfish。这是上面使用 await
的修改后的 ES6 版本let allRecords = [];
// first we do a search, and specify a scroll timeout
var { _scroll_id, hits } = await esclient.search({
index: 'test',
type: 'records',
scroll: '10s',
body: {
query: {
"match_all": {}
},
_source: false
}
})
while(hits && hits.hits.length) {
// Append all new hits
allRecords.push(...hits.hits)
console.log(`${allRecords.length} of ${hits.total}`)
var { _scroll_id, hits } = await esclient.scroll({
scrollId: _scroll_id,
scroll: '10s'
})
}
console.log(`Complete: ${allRecords.length} records retrieved`)
这就是我使用 Promises 的方式
var EsHelper = function() {
this.esUrl = esUrl;
this.indexName = "myIndex";
this.type = "myIndexType";
this.elasticClient = new elasticsearch.Client({
host: esUrl
});
};
EsHelper.prototype.scrollData = function(response, allHits) {
return new Promise((resolve, reject) => {
response.hits.hits.forEach((hit) => allHits.push(hit));
if (response.hits.total !== allHits.length) {
this.elasticClient.scroll({
scroll_id: response._scroll_id,
scroll: '10s',
}).then((response) => {
resolve(this.scrollData(response, allHits));
}).catch((error) => reject(error));
} else {
resolve(allHits);
}
});
};
EsHelper.prototype.runSearchWithScroll = function(query) {
var allHits = [];
return this.elasticClient.search({
index: this.indexName,
type: this.type,
scroll: '10s',
body: query
})
.then((response) => (this.scrollData(response, allHits)))
.then((result) => {
return result;
});
};
有更好的方法吗?
当 elastic 有超过 10000 个结果时,NodeJS 失败。这就是我使用滚动的方式。
async function getResultsFromElastic() {
let responseAll = {};
responseAll["hits"] = {};
responseAll.hits.hits = [];
const responseQueue = [];
searchQuery = {
index: 'test',
type: 'records',
body: {
query: {
"match_all": {}
}
}
}
searchQuery.scroll='10s';
searchQuery.size=10000;
responseQueue.push(await esclient.search(searchQuery));
while (responseQueue.length) {
const response = responseQueue.shift();
responseAll.hits.hits = responseAll.hits.hits.concat(response.hits.hits);
if (response.hits.total == responseAll.hits.hits.length) {
break;
}
// get the next response if there are more to fetch
responseQueue.push(
await esclient.scroll({
scrollId: response._scroll_id,
scroll: '30s'
})
);
}
return responseAll;
}
这里有很多 well-written 答案可以解决问题。但是如果有人正在寻找开箱即用的解决方案,他们可以到这里来使用这个包 - https://github.com/alcacoop/elasticsearch-scroll-stream
用法非常简单,而且效果很好。下面是我从他们的官方文档中摘录的一个例子。
const elasticsearch = require('elasticsearch');
const ElasticsearchScrollStream = require('elasticsearch-scroll-stream');
const client = new elasticsearch.Client();
const es_stream = new ElasticsearchScrollStream(client, {
index: 'your-index',
type: 'your-type',
scroll: '10s',
size: '50',
_source: ['name'],
q: 'name:*'
});
es_stream.pipe(process.stdout);
es_stream.on('data', function(data) {
// Process your results here
});
es_stream.on('end', function() {
console.log("End");
});
使用 Node.js 客户端使用 async/await.
滚动从弹性搜索中获取所有数据的查询const elasticsearch = require('@elastic/elasticsearch');
async function esconnection(){
let es = await new elasticsearch.Client({
node: "http://192.168.1.1:7200"
});
return es;
}
async function getAllUserList(){
try{
let userArray = [];
let query ={
"query":{
"match_all": {}
}
}
let es = await esconnection();
let {body}= await es.search({
index: 'esIndex',
type :"esIndexType",
scroll :'2m', //# Specify how long a consistent view of the index should be maintained for scrolled search
size: 100, // # Number of hits to return (default: 10)
body: query
});
let sid = body['_scroll_id']
let scroll_size = body['hits']['total']
let dataLength = body['hits']['hits'].length
while (scroll_size > 0){
for(let i=0; i<dataLength;i++){
if(body['hits']['hits'][i])
{
let userData = (body['hits']['hits'][i]['_source'])
userArray.push(userData)
}
}
sid = body['_scroll_id']
body = await es.scroll({
scrollId: sid,
scroll: '10s'
})
body=body.body
scroll_size = (body['hits']['hits']).length;
}
es.close();
return userArray;
} catch(error){
console.log("Code not working properly: ",`${error}`)
}
}
我想我们也可以使用 yield,例如
/**
*
* @param {object} elasticClient
* @param {{index: string, scroll: string, size: number: body: object}} searchQuery
*/
async function* getRecords(elasticClient, searchQuery) {
const response = await elasticClient.search(searchQuery);
const responseQueue = [];
let counter = 0;
responseQueue.push(response);
while(responseQueue.length) {
const { body } = responseQueue.shift();
counter += body.hits.hits.length;
for(const hit of body.hits.hits) {
yield hit;
}
if (body.hits.total.value === counter) {
break;
}
responseQueue.push(
await elasticClient.scroll({
scrollId: body._scroll_id,
scroll: searchQuery.scroll
})
)
}
}
然后是你的查询
const body = { query: {"match_all": {}} } };
for await (const record of getRecords(elasticClient, {index: 'test', scroll: '30s', size: 100, body})) {
console.log(record);
}